diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 2ea5b5adae74b..89b13fdf868e0 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -31,14 +31,11 @@ flup-py3,Vendor,BSD-3-Clause,"Copyright (c) 2005, 2006 Allan Saddi -gssapi,PyPI,ISC,"Copyright (c) 2014, The Python GSSAPI Team" immutables,PyPI,Apache-2.0,Copyright 2018-present Contributors to the immutables project. importlib-metadata,PyPI,Apache-2.0,Copyright Jason R. Coombs in-toto,PyPI,Apache-2.0,Copyright 2018 New York University ipaddress,PyPI,PSF,Copyright (c) 2013 Philipp Hagemeister jellyfish,PyPI,BSD-3-Clause,Copyright (c) 2015 James Turk -kafka-python,PyPI,Apache-2.0,Copyright 2015 David Arthur -kazoo,PyPI,Apache-2.0,Copyright 2012 Kazoo team kubernetes,PyPI,Apache-2.0,Copyright 2014 The Kubernetes Authors. ldap3,PyPI,LGPL-3.0-only,Copyright 2013 - 2020 Giovanni Cannata lxml,PyPI,BSD-3-Clause,Copyright (c) 2004 Infrae. All rights reserved. diff --git a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in index b9acfad3f2a48..89ab1f877d97b 100644 --- a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in +++ b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in @@ -27,7 +27,6 @@ enum34==1.1.10; python_version < '3.0' foundationdb==6.3.24; python_version > '3.0' futures==3.4.0; python_version < '3.0' gearman==2.0.2; sys_platform != 'win32' and python_version < '3.0' -gssapi==1.8.2; python_version > '3.0' immutables==0.19; python_version > '3.0' importlib-metadata==2.1.3; python_version < '3.8' in-toto==1.0.1; python_version > '3.0' @@ -35,8 +34,6 @@ ipaddress==1.0.23; python_version < '3.0' jaydebeapi==1.2.3 jellyfish==0.9.0; python_version > '3.0' jpype1==1.4.1; python_version > '3.0' -kafka-python==2.0.2 -kazoo==2.9.0 kubernetes==18.20.0; python_version < '3.0' kubernetes==26.1.0; python_version > '3.0' ldap3==2.9.1 diff --git a/kafka_consumer/assets/configuration/spec.yaml b/kafka_consumer/assets/configuration/spec.yaml index 400898760b65b..c50e53cb2f741 100644 --- a/kafka_consumer/assets/configuration/spec.yaml +++ b/kafka_consumer/assets/configuration/spec.yaml @@ -182,14 +182,4 @@ files: value: type: string example: - - name: broker_requests_batch_size - description: | - The OffsetRequests sent to each broker are batched by the kafka_consumer check in groups of 30 by default. - If the batch size is too big, you may see KafkaTimeoutError exceptions in the logs while - running the wakeup calls. - If the batch size is too small, the check will take longer to run. - display_default: 30 - value: - type: integer - example: 30 - template: instances/default diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py similarity index 97% rename from kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py rename to kafka_consumer/datadog_checks/kafka_consumer/client.py index 9a1272195d9b4..7856fac783d08 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -6,11 +6,16 @@ from six import string_types from datadog_checks.base import ConfigurationError -from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS -class ConfluentKafkaClient(KafkaClient): +class KafkaClient: + def __init__(self, config, tls_context, log) -> None: + self.config = config + self.log = log + self._kafka_client = None + self._tls_context = tls_context + @property def kafka_client(self): if self._kafka_client is None: @@ -71,8 +76,6 @@ def __get_authentication_config(self): return config def get_highwater_offsets(self, consumer_offsets): - # TODO: Remove broker_requests_batch_size as config after - # kafka-python is removed if we don't need to batch requests in Confluent highwater_offsets = {} topics_with_consumer_offset = {} if not self.config._monitor_all_broker_highwatermarks: diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/__init__.py b/kafka_consumer/datadog_checks/kafka_consumer/client/__init__.py deleted file mode 100644 index e0cc3d0a7662c..0000000000000 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# (C) Datadog, Inc. 2023-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py deleted file mode 100644 index 38cb900beb355..0000000000000 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py +++ /dev/null @@ -1,37 +0,0 @@ -# (C) Datadog, Inc. 2023-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -from datadog_checks.kafka_consumer.client.confluent_kafka_client import ConfluentKafkaClient -from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient -from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient - - -class GenericKafkaClient(KafkaClient): - def __init__(self, config, tls_context, log) -> None: - super().__init__(config, tls_context, log) - self.use_legacy_client = config.use_legacy_client - self.confluent_kafka_client = ( - ConfluentKafkaClient(config, tls_context, log) if not self.use_legacy_client else None - ) - self.python_kafka_client = KafkaPythonClient(config, tls_context, log) - - def get_consumer_offsets(self): - if self.use_legacy_client: - return self.python_kafka_client.get_consumer_offsets() - return self.confluent_kafka_client.get_consumer_offsets() - - def get_highwater_offsets(self, consumer_offsets): - if self.use_legacy_client: - return self.python_kafka_client.get_highwater_offsets(consumer_offsets) - return self.confluent_kafka_client.get_highwater_offsets(consumer_offsets) - - def get_partitions_for_topic(self, topic): - if self.use_legacy_client: - return self.python_kafka_client.get_partitions_for_topic(topic) - return self.confluent_kafka_client.get_partitions_for_topic(topic) - - def request_metadata_update(self): - if self.use_legacy_client: - return self.python_kafka_client.request_metadata_update() - return self.confluent_kafka_client.request_metadata_update() diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py deleted file mode 100644 index 36a75b6a0fb29..0000000000000 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ /dev/null @@ -1,28 +0,0 @@ -# (C) Datadog, Inc. 2023-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) -from abc import ABC, abstractmethod - - -class KafkaClient(ABC): - def __init__(self, config, tls_context, log) -> None: - self.config = config - self.log = log - self._kafka_client = None - self._tls_context = tls_context - - @abstractmethod - def get_consumer_offsets(self): - pass - - @abstractmethod - def get_highwater_offsets(self): - pass - - @abstractmethod - def get_partitions_for_topic(self, topic): - pass - - @abstractmethod - def request_metadata_update(self): - pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py deleted file mode 100644 index 17a6372669dfe..0000000000000 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ /dev/null @@ -1,400 +0,0 @@ -# (C) Datadog, Inc. 2023-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) -import ssl -from collections import defaultdict - -from kafka import KafkaAdminClient -from kafka import errors as kafka_errors -from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import ListGroupsRequest -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse -from kafka.structs import TopicPartition -from six import iteritems, string_types - -from datadog_checks.base import ConfigurationError -from datadog_checks.base.utils.http import AuthTokenOAuthReader -from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient -from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS - - -class OAuthTokenProvider(AbstractTokenProvider): - def __init__(self, **config): - self.reader = AuthTokenOAuthReader(config) - - def token(self): - # Read only if necessary or use cached token - return self.reader.read() or self.reader._token - - -class KafkaPythonClient(KafkaClient): - def get_consumer_offsets(self): - """Fetch Consumer Group offsets from Kafka. - - Also fetch consumer_groups, topics, and partitions if not already specified. - - For speed, all the brokers are queried in parallel using callbacks. - The callback flow is: - A: When fetching all groups ('monitor_unlisted_consumer_groups' is True): - 1. Issue a ListGroupsRequest to every broker - 2. Attach a callback to each ListGroupsRequest that issues OffsetFetchRequests for every group. - Note: Because a broker only returns groups for which it is the coordinator, as an optimization we - skip the FindCoordinatorRequest - B: When fetching only listed groups: - 1. Issue a FindCoordintorRequest for each group - 2. Attach a callback to each FindCoordinatorResponse that issues OffsetFetchRequests for that group - Both: - 3. Attach a callback to each OffsetFetchRequest that parses the response - and saves the consumer group's offsets - """ - # Store the list of futures on the object because some of the callbacks create/store additional futures and they - # don't have access to variables scoped to this method, only to the object scope - self._consumer_futures = [] - consumer_offsets = {} - - if self.config._monitor_unlisted_consumer_groups: - for broker in self.kafka_client._client.cluster.brokers(): - # FIXME: This is using a workaround to skip socket wakeup, which causes blocking - # (see https://github.com/dpkp/kafka-python/issues/2286). - # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official - # implementation for this function instead. - list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) - list_groups_future.add_callback(self._list_groups_callback, broker.nodeId, consumer_offsets) - self._consumer_futures.append(list_groups_future) - elif self.config._consumer_groups: - self._validate_consumer_groups() - for consumer_group in self.config._consumer_groups: - find_coordinator_future = self._find_coordinator_id_send_request(consumer_group) - find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group, consumer_offsets) - self._consumer_futures.append(find_coordinator_future) - else: - raise ConfigurationError( - "Cannot fetch consumer offsets because no consumer_groups are specified and " - "monitor_unlisted_consumer_groups is %s." % self.config._monitor_unlisted_consumer_groups - ) - - # Loop until all futures resolved. - self.kafka_client._wait_for_futures(self._consumer_futures) - del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs - - return consumer_offsets - - def get_highwater_offsets(self, consumer_offsets): - """Fetch highwater offsets for topic_partitions in the Kafka cluster. - - Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether - producers are successfully producing. - - If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater - mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer - offset. - - Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded. - - Any partitions that don't currently have a leader will be skipped. - - Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) - - For speed, all the brokers are queried in parallel using callbacks. The callback flow is: - 1. Issue an OffsetRequest to every broker - 2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets. - """ - highwater_futures = [] # No need to store on object because the callbacks don't create additional futures - - # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for - # which this run of the check has at least once saved consumer offset. This is later used as a filter for - # excluding partitions. - - highwater_offsets = {} - - if not self.config._monitor_all_broker_highwatermarks: - tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in consumer_offsets} - - for batch in self.batchify( - self.kafka_client._client.cluster.brokers(), self.config._broker_requests_batch_size - ): - for broker in batch: - broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId) - if broker_led_partitions is None: - continue - - # Take the partitions for which this broker is the leader and group them by topic in order to construct - # the OffsetRequest while simultaneously filtering out partitions we want to exclude - partitions_grouped_by_topic = defaultdict(list) - for topic, partition in broker_led_partitions: - # No sense fetching highwater offsets for internal topics - if topic not in KAFKA_INTERNAL_TOPICS and ( - self.config._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset - ): - partitions_grouped_by_topic[topic].append(partition) - - # Construct the OffsetRequest - max_offsets = 1 - request = OffsetRequest[0]( - replica_id=-1, - topics=[ - (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) - for topic, partitions in partitions_grouped_by_topic.items() - ], - ) - - # We can disable wakeup here because it is the same thread doing both polling and sending. Also, it - # is possible that the wakeup itself could block if a large number of sends were processed beforehand. - highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False) - - highwater_future.add_callback(self._highwater_offsets_callback, highwater_offsets) - highwater_futures.append(highwater_future) - - # Loop until all futures resolved. - self.kafka_client._wait_for_futures(highwater_futures) - - return highwater_offsets - - def create_kafka_admin_client(self): - crlfile = self.config._crlfile - if crlfile: - self._tls_context.load_verify_locations(crlfile) - self._tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - - return KafkaAdminClient( - bootstrap_servers=self.config._kafka_connect_str, - client_id='dd-agent', - request_timeout_ms=self.config._request_timeout_ms, - # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for - # broker version during the bootstrapping process. Note that this returns the first version found, so in - # a mixed-version cluster this will be a non-deterministic result. - api_version=self.config._kafka_version, - # While we check for SASL/SSL params, if not present they will default to the kafka-python values for - # plaintext connections - security_protocol=self.config._security_protocol, - sasl_mechanism=self.config._sasl_mechanism, - sasl_plain_username=self.config._sasl_plain_username, - sasl_plain_password=self.config._sasl_plain_password, - sasl_kerberos_service_name=self.config._sasl_kerberos_service_name, - sasl_kerberos_domain_name=self.config._sasl_kerberos_domain_name, - sasl_oauth_token_provider=( - OAuthTokenProvider(**self.config._sasl_oauth_token_provider) - if 'sasl_oauth_token_provider' in self.config.instance - else None - ), - ssl_context=self._tls_context, - ) - - def _create_kafka_admin_client(self, api_version): - """Return a KafkaAdminClient.""" - # TODO accept None (which inherits kafka-python default of localhost:9092) - kafka_admin_client = self.create_kafka_admin_client() - self.log.debug("KafkaAdminClient api_version: %s", kafka_admin_client.config['api_version']) - # Force initial population of the local cluster metadata cache - kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update()) - if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None: - raise RuntimeError("Local cluster metadata cache did not populate.") - return kafka_admin_client - - @property - def kafka_client(self): - if self._kafka_client is None: - # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for - # broker version during the bootstrapping process. Note that this returns the first version found, so in - # a mixed-version cluster this will be a non-deterministic result. - kafka_version = self.config._kafka_version - if isinstance(kafka_version, str): - kafka_version = tuple(map(int, kafka_version.split("."))) - - self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) - return self._kafka_client - - def _highwater_offsets_callback(self, highwater_offsets, response): - """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" - if type(response) not in OffsetResponse: - raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response)) - for topic, partitions_data in response.topics: - for partition, error_code, offsets in partitions_data: - error_type = kafka_errors.for_code(error_code) - if error_type is kafka_errors.NoError: - highwater_offsets[(topic, partition)] = offsets[0] - elif error_type is kafka_errors.NotLeaderForPartitionError: - self.log.warning( - "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " - "if the broker that was the partition leader when kafka_admin_client last fetched metadata is " - "no longer the leader.", - error_type.message, - error_type.errno, - topic, - partition, - ) - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - elif error_type is kafka_errors.UnknownTopicOrPartitionError: - self.log.warning( - "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only " - "happen if the topic is currently being deleted or the check configuration lists non-existent " - "topic partitions.", - error_type.message, - error_type.errno, - topic, - partition, - ) - else: - raise error_type( - "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, " - "partition: %s." % (topic, partition) - ) - - @staticmethod - def batchify(iterable, batch_size): - iterable = list(iterable) - return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size)) - - # FIXME: This is using a workaround to skip socket wakeup, which causes blocking - # (see https://github.com/dpkp/kafka-python/issues/2286). - # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official - # implementation for this function instead. - def _send_request_to_node(self, node_id, request, wakeup=True): - while not self.kafka_client._client.ready(node_id): - # poll until the connection to broker is ready, otherwise send() - # will fail with NodeNotReadyError - self.kafka_client._client.poll() - return self.kafka_client._client.send(node_id, request, wakeup=wakeup) - - def _validate_consumer_groups(self): - """Validate any explicitly specified consumer groups. - - consumer_groups = {'consumer_group': {'topic': [0, 1]}} - """ - assert isinstance(self.config._consumer_groups, dict) - for consumer_group, topics in self.config._consumer_groups.items(): - assert isinstance(consumer_group, string_types) - assert isinstance(topics, dict) or topics is None # topics are optional - if topics is not None: - for topic, partitions in topics.items(): - assert isinstance(topic, string_types) - assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional - if partitions is not None: - for partition in partitions: - assert isinstance(partition, int) - - def _list_groups_callback(self, broker_id, consumer_offsets, response): - """Callback that takes a ListGroupsResponse and issues an OffsetFetchRequest for each group. - - broker_id must be manually passed in because it is not present in the response. Keeping track of the broker that - gave us this response lets us skip issuing FindCoordinatorRequests because Kafka brokers only include - consumer groups in their ListGroupsResponse when they are the coordinator for that group. - """ - for consumer_group, group_type in self.kafka_client._list_consumer_groups_process_response(response): - # consumer groups from Kafka < 0.9 that store their offset in Kafka don't use Kafka for group-coordination - # so their group_type is empty - if group_type in ('consumer', ''): - single_group_offsets_future = self._list_consumer_group_offsets_send_request( - group_id=consumer_group, group_coordinator_id=broker_id - ) - single_group_offsets_future.add_callback( - self._single_group_offsets_callback, consumer_group, consumer_offsets - ) - self._consumer_futures.append(single_group_offsets_future) - - def _find_coordinator_callback(self, consumer_group, consumer_offsets, response): - """Callback that takes a FindCoordinatorResponse and issues an OffsetFetchRequest for the group. - - consumer_group must be manually passed in because it is not present in the response, but we need it in order to - associate these offsets to the proper consumer group. - - The OffsetFetchRequest is scoped to the topics and partitions that are specified in the check config. If - topics are unspecified, it will fetch all known offsets for that consumer group. Similarly, if the partitions - are unspecified for a topic listed in the config, offsets are fetched for all the partitions within that topic. - """ - coordinator_id = self.kafka_client._find_coordinator_id_process_response(response) - topics = self.config._consumer_groups[consumer_group] - if not topics: - topic_partitions = None # None signals to fetch all known offsets for the consumer group - else: - # transform [("t1", [1, 2])] into [TopicPartition("t1", 1), TopicPartition("t1", 2)] - topic_partitions = [] - for topic, partitions in topics.items(): - if not partitions: # If partitions aren't specified, fetch all partitions in the topic - partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) - topic_partitions.extend([TopicPartition(topic, p) for p in partitions]) - single_group_offsets_future = self._list_consumer_group_offsets_send_request( - group_id=consumer_group, group_coordinator_id=coordinator_id, partitions=topic_partitions - ) - single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group, consumer_offsets) - self._consumer_futures.append(single_group_offsets_future) - - def _single_group_offsets_callback(self, consumer_group, consumer_offsets, response): - """Callback that parses an OffsetFetchResponse and saves it to the consumer_offsets dict. - - consumer_group must be manually passed in because it is not present in the response, but we need it in order to - associate these offsets to the proper consumer group. - """ - single_group_offsets = self.kafka_client._list_consumer_group_offsets_process_response(response) - self.log.debug("Single group offsets: %s", single_group_offsets) - for (topic, partition), (offset, _metadata) in single_group_offsets.items(): - # If the OffsetFetchRequest explicitly specified partitions, the offset could returned as -1, meaning there - # is no recorded offset for that partition... for example, if the partition doesn't exist in the cluster. - # So ignore it. - if offset == -1: - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - continue - key = (consumer_group, topic, partition) - consumer_offsets[key] = offset - - def _list_consumer_groups_send_request(self, broker_id): - kafka_version = self.kafka_client._matching_api_version(ListGroupsRequest) - if kafka_version <= 2: - request = ListGroupsRequest[kafka_version]() - else: - raise NotImplementedError( - "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version) - ) - # Disable wakeup when sending request to prevent blocking send requests - return self._send_request_to_node(broker_id, request, wakeup=False) - - def _find_coordinator_id_send_request(self, group_id): - """Send a FindCoordinatorRequest to a broker. - :param group_id: The consumer group ID. This is typically the group - name as a string. - :return: A message future - """ - version = 0 - request = GroupCoordinatorRequest[version](group_id) - return self._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) - - def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None): - """Send an OffsetFetchRequest to a broker. - :param group_id: The consumer group id name for which to fetch offsets. - :param group_coordinator_id: The node_id of the group's coordinator - broker. - :return: A message future - """ - version = self.kafka_client._matching_api_version(OffsetFetchRequest) - if version <= 3: - if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format( - version - ) - ) - topics_partitions = None - else: - # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] - topics_partitions_dict = defaultdict(set) - for topic, partition in partitions: - topics_partitions_dict[topic].add(partition) - topics_partitions = list(iteritems(topics_partitions_dict)) - request = OffsetFetchRequest[version](group_id, topics_partitions) - else: - raise NotImplementedError( - "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version) - ) - return self._send_request_to_node(group_coordinator_id, request, wakeup=False) - - def get_partitions_for_topic(self, topic): - return self.kafka_client._client.cluster.partitions_for_topic(topic) - - def request_metadata_update(self): - self.kafka_client._client.cluster.request_update() diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config.py b/kafka_consumer/datadog_checks/kafka_consumer/config.py index 4cc83ab9441a0..a8a4dc4db1088 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config.py @@ -6,11 +6,7 @@ from six import string_types from datadog_checks.base import ConfigurationError, is_affirmative -from datadog_checks.kafka_consumer.constants import ( - BROKER_REQUESTS_BATCH_SIZE, - CONTEXT_UPPER_BOUND, - DEFAULT_KAFKA_TIMEOUT, -) +from datadog_checks.kafka_consumer.constants import CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT class KafkaConfig: @@ -24,7 +20,6 @@ def __init__(self, init_config, instance) -> None: instance.get('monitor_all_broker_highwatermarks', False) ) self._consumer_groups = instance.get('consumer_groups', {}) - self._broker_requests_batch_size = instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_connect_str = instance.get('kafka_connect_str') @@ -48,7 +43,6 @@ def __init__(self, init_config, instance) -> None: self._tls_private_key = instance.get("tls_private_key") self._tls_private_key_password = instance.get("tls_private_key_password") self._tls_validate_hostname = is_affirmative(instance.get("tls_validate_hostname", True)) - self.use_legacy_client = is_affirmative(instance.get('use_legacy_client', False)) if self._tls_cert or self._tls_ca_cert or self._tls_private_key or self._tls_private_key_password: self._tls_verify = True diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py index 944ef72adf499..424fa1dd167db 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/defaults.py @@ -18,10 +18,6 @@ def shared_service(field, value): return get_default_field_value(field, value) -def instance_broker_requests_batch_size(field, value): - return 30 - - def instance_consumer_groups(field, value): return get_default_field_value(field, value) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py index 3c45ddeea4925..bea02385a4e3c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/config_models/instance.py @@ -40,7 +40,6 @@ class InstanceConfig(BaseModel): class Config: allow_mutation = False - broker_requests_batch_size: Optional[int] consumer_groups: Optional[Mapping[str, Any]] disable_generic_tags: Optional[bool] empty_default_hostname: Optional[bool] diff --git a/kafka_consumer/datadog_checks/kafka_consumer/constants.py b/kafka_consumer/datadog_checks/kafka_consumer/constants.py index d8058d4269ab0..a37a4add15cfe 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/constants.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/constants.py @@ -18,5 +18,3 @@ '_confluent-telemetry-metrics', '_confluent-command', } - -BROKER_REQUESTS_BATCH_SIZE = 30 diff --git a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example index ccc7eb0474fe0..d5f7f3c4b404a 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example +++ b/kafka_consumer/datadog_checks/kafka_consumer/data/conf.yaml.example @@ -207,14 +207,6 @@ instances: # # tls_crlfile: - ## @param broker_requests_batch_size - integer - optional - default: 30 - ## The OffsetRequests sent to each broker are batched by the kafka_consumer check in groups of 30 by default. - ## If the batch size is too big, you may see KafkaTimeoutError exceptions in the logs while - ## running the wakeup calls. - ## If the batch size is too small, the check will take longer to run. - # - # broker_requests_batch_size: 30 - ## @param tags - list of strings - optional ## A list of tags to attach to every metric and service check emitted by this instance. ## diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 659821b24256a..6b4c5bc830f66 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -4,7 +4,7 @@ from time import time from datadog_checks.base import AgentCheck -from datadog_checks.kafka_consumer.client.generic_kafka_client import GenericKafkaClient +from datadog_checks.kafka_consumer.client import KafkaClient from datadog_checks.kafka_consumer.config import KafkaConfig @@ -25,7 +25,7 @@ def __init__(self, name, init_config, instances): super(KafkaCheck, self).__init__(name, init_config, instances) self.config = KafkaConfig(self.init_config, self.instance) self._context_limit = self.config._context_limit - self.client = GenericKafkaClient(self.config, self.get_tls_context(), self.log) + self.client = KafkaClient(self.config, self.get_tls_context(), self.log) self.check_initializations.append(self.config.validate_config) def check(self, _): diff --git a/kafka_consumer/hatch.toml b/kafka_consumer/hatch.toml index 47a27349e15b4..e1d574c017db0 100644 --- a/kafka_consumer/hatch.toml +++ b/kafka_consumer/hatch.toml @@ -12,13 +12,11 @@ post-install-commands = [ [envs.default.env-vars] ZK_VERSION = "3.6.4" -LEGACY_CLIENT = "false" AUTHENTICATION = "noauth" [[envs.default.matrix]] python = ["3.8"] version = ["1.1", "2.3", "3.3"] -impl = ["legacy"] [[envs.default.matrix]] python = ["3.8"] @@ -27,7 +25,6 @@ version = ["1.1", "2.3", "3.3"] [[envs.default.matrix]] python = ["3.8"] version = ["3.3"] -impl = ["legacy"] auth = ["ssl"] [[envs.default.matrix]] @@ -42,9 +39,6 @@ matrix.version.env-vars = [ { key = "KAFKA_VERSION", value = "2.3.1", if = ["2.3"] }, { key = "KAFKA_VERSION", value = "3.3.2", if = ["3.3"] }, ] -matrix.impl.env-vars = [ - { key = "LEGACY_CLIENT", value = "true", if = ["legacy"] }, -] matrix.auth.env-vars = "AUTHENTICATION" # We can allow kerberos once the agent image contains confluent-kafka built from source matrix.auth.e2e-env = { value = false, if = ["kerberos"] } @@ -52,4 +46,3 @@ matrix.auth.e2e-env = { value = false, if = ["kerberos"] } [envs.latest.env-vars] KAFKA_VERSION = "latest" ZK_VERSION = "3.6.4" -LEGACY_CLIENT = "false" diff --git a/kafka_consumer/pyproject.toml b/kafka_consumer/pyproject.toml index 8d8bb6d59f4f8..ed85ad7459946 100644 --- a/kafka_consumer/pyproject.toml +++ b/kafka_consumer/pyproject.toml @@ -41,9 +41,6 @@ deps = [ # confluent-kafka is built in omnibus, so bumping it here will have no real effect # if you bump this version, also bump the one in the `hatch.toml` file "confluent-kafka==2.0.2; python_version > '3.0'", - "gssapi==1.8.2; python_version > '3.0'", - "kafka-python==2.0.2", - "kazoo==2.9.0", ] [project.urls] diff --git a/kafka_consumer/tests/common.py b/kafka_consumer/tests/common.py index ab0ca9ec55513..54e3fc1da6683 100644 --- a/kafka_consumer/tests/common.py +++ b/kafka_consumer/tests/common.py @@ -4,7 +4,6 @@ import os import socket -from datadog_checks.base import is_affirmative from datadog_checks.dev import get_docker_hostname from datadog_checks.dev.utils import get_metadata_metrics @@ -16,7 +15,6 @@ PARTITIONS = [0, 1] BROKER_METRICS = ['kafka.broker_offset'] CONSUMER_METRICS = ['kafka.consumer_offset', 'kafka.consumer_lag'] -LEGACY_CLIENT = is_affirmative(os.environ.get('LEGACY_CLIENT', 'false')) AUTHENTICATION = os.environ.get('AUTHENTICATION', 'noauth') DOCKER_IMAGE_PATH = os.path.join(HERE, 'docker', AUTHENTICATION, "docker-compose.yaml") diff --git a/kafka_consumer/tests/conftest.py b/kafka_consumer/tests/conftest.py index d257068296948..4f2485fc4e679 100644 --- a/kafka_consumer/tests/conftest.py +++ b/kafka_consumer/tests/conftest.py @@ -14,15 +14,7 @@ from datadog_checks.dev.ci import running_on_ci from datadog_checks.kafka_consumer import KafkaCheck -from .common import ( - AUTHENTICATION, - DOCKER_IMAGE_PATH, - HERE, - KAFKA_CONNECT_STR, - LEGACY_CLIENT, - TOPICS, - get_authentication_configuration, -) +from .common import AUTHENTICATION, DOCKER_IMAGE_PATH, HERE, KAFKA_CONNECT_STR, TOPICS, get_authentication_configuration from .runners import Consumer, Producer CERTIFICATE_DIR = os.path.join(os.path.dirname(__file__), 'docker', 'ssl', 'certificate') @@ -31,46 +23,30 @@ PRIVATE_KEY = os.path.join(CERTIFICATE_DIR, 'key.pem') PRIVATE_KEY_PASSWORD = 'secret' -if LEGACY_CLIENT: - E2E_METADATA = { - 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], - 'start_commands': [ - 'apt-get update', - 'apt-get install -y build-essential', - ], - 'docker_volumes': [ - f'{HERE}/docker/ssl/certificate:/tmp/certificate', - ], - } -else: - E2E_METADATA = { - 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], - 'docker_volumes': [ - f'{HERE}/docker/ssl/certificate:/tmp/certificate', - f'{HERE}/docker/kerberos/kdc/krb5_agent.conf:/etc/krb5.conf', - ], - } +E2E_METADATA = { + 'custom_hosts': [('kafka1', '127.0.0.1'), ('kafka2', '127.0.0.1')], + 'docker_volumes': [ + f'{HERE}/docker/ssl/certificate:/tmp/certificate', + f'{HERE}/docker/kerberos/kdc/krb5_agent.conf:/etc/krb5.conf', + ], +} if AUTHENTICATION == "ssl": INSTANCE = { 'kafka_connect_str': "localhost:9092", 'tags': ['optional:tag1'], 'consumer_groups': {'my_consumer': {'marvel': [0]}}, - 'broker_requests_batch_size': 1, 'security_protocol': 'SSL', 'tls_cert': CERTIFICATE, 'tls_private_key': PRIVATE_KEY, 'tls_private_key_password': PRIVATE_KEY_PASSWORD, 'tls_ca_cert': ROOT_CERTIFICATE, - 'use_legacy_client': LEGACY_CLIENT, } elif AUTHENTICATION == "kerberos": INSTANCE = { 'kafka_connect_str': "localhost:9092", 'tags': ['optional:tag1'], 'consumer_groups': {'my_consumer': {'marvel': [0]}}, - 'broker_requests_batch_size': 1, - 'use_legacy_client': LEGACY_CLIENT, "sasl_mechanism": "GSSAPI", "sasl_kerberos_service_name": "kafka", "security_protocol": "SASL_PLAINTEXT", @@ -83,8 +59,6 @@ 'kafka_connect_str': KAFKA_CONNECT_STR, 'tags': ['optional:tag1'], 'consumer_groups': {'my_consumer': {'marvel': [0]}}, - 'broker_requests_batch_size': 1, - 'use_legacy_client': LEGACY_CLIENT, } E2E_INSTANCE = copy.deepcopy(INSTANCE) diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 8b6be831bf867..57b8e573cb285 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -6,50 +6,14 @@ import mock import pytest -from tests.common import LEGACY_CLIENT, metrics +from tests.common import metrics from datadog_checks.dev.utils import get_metadata_metrics from datadog_checks.kafka_consumer import KafkaCheck -from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider pytestmark = [pytest.mark.unit] -@pytest.mark.skipif(not LEGACY_CLIENT, reason='not implemented yet with confluent-kafka') -def test_gssapi(kafka_instance, dd_run_check, check): - kafka_instance['sasl_mechanism'] = 'GSSAPI' - kafka_instance['security_protocol'] = 'SASL_PLAINTEXT' - kafka_instance['sasl_kerberos_service_name'] = 'kafka' - # assert the check doesn't fail with: - # Exception: Could not find main GSSAPI shared library. - with pytest.raises(Exception, match='check_version'): - dd_run_check(check(kafka_instance)) - - -@pytest.mark.skip(reason='Add a test that not only check the parameter but also run the check') -def test_oauth_token_client_config(check, kafka_instance): - kafka_instance['kafka_client_api_version'] = "3.3.2" - kafka_instance['security_protocol'] = "SASL_PLAINTEXT" - kafka_instance['sasl_mechanism'] = "OAUTHBEARER" - kafka_instance['sasl_oauth_token_provider'] = { - "url": "http://fake.url", - "client_id": "id", - "client_secret": "secret", - } - - with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: - kafka_consumer_check = check(kafka_instance) - kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) - params = kafka_admin_client.call_args_list[0].kwargs - - assert params['security_protocol'] == 'SASL_PLAINTEXT' - assert params['sasl_mechanism'] == 'OAUTHBEARER' - assert params['sasl_oauth_token_provider'].reader._client_id == "id" - assert params['sasl_oauth_token_provider'].reader._client_secret == "secret" - assert params['sasl_oauth_token_provider'].reader._url == "http://fake.url" - assert isinstance(params['sasl_oauth_token_provider'], OAuthTokenProvider) - - @pytest.mark.parametrize( 'extra_config, expected_http_kwargs', [ @@ -68,27 +32,6 @@ def test_tls_config_legacy(extra_config, expected_http_kwargs, check, kafka_inst assert expected_http_kwargs == actual_options -@pytest.mark.skipif(not LEGACY_CLIENT, reason='The kafka-python implementation raises an exception') -def test_legacy_invalid_connect_str(dd_run_check, check, aggregator, caplog, kafka_instance): - caplog.set_level(logging.DEBUG) - kafka_instance['kafka_connect_str'] = 'invalid' - del kafka_instance['consumer_groups'] - with pytest.raises(Exception): - dd_run_check(check(kafka_instance)) - - for m in metrics: - aggregator.assert_metric(m, count=0) - - exception_msg = ( - 'ConfigurationError: Cannot fetch consumer offsets because no consumer_groups are specified and ' - 'monitor_unlisted_consumer_groups is False' - ) - - assert exception_msg in caplog.text - aggregator.assert_metrics_using_metadata(get_metadata_metrics()) - - -@pytest.mark.skipif(LEGACY_CLIENT, reason='The following condition only occurs in confluent-kafka implementation') def test_invalid_connect_str(dd_run_check, check, aggregator, caplog, kafka_instance): caplog.set_level(logging.DEBUG) kafka_instance['kafka_connect_str'] = 'invalid' @@ -142,7 +85,6 @@ def test_invalid_connect_str(dd_run_check, check, aggregator, caplog, kafka_inst ), ], ) -@pytest.mark.skipif(LEGACY_CLIENT, reason='kafka_python does not have confluent_kafka_client.AdminClient') def test_oauth_config( sasl_oauth_token_provider, expected_exception, mocked_admin_client, check, dd_run_check, kafka_instance ): @@ -157,7 +99,7 @@ def test_oauth_config( with expected_exception: with mock.patch( - 'datadog_checks.kafka_consumer.client.confluent_kafka_client.AdminClient', + 'datadog_checks.kafka_consumer.kafka_consumer.KafkaClient', return_value=mocked_admin_client, ): dd_run_check(check(kafka_instance)) @@ -165,7 +107,7 @@ def test_oauth_config( # TODO: After these tests are finished and the revamp is complete, # the tests should be refactored to be parameters instead of separate tests -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient") +@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_consumer_lag_less_than_zero_then_emit_event( mock_generic_client, check, kafka_instance, dd_run_check, aggregator ): @@ -208,7 +150,7 @@ def test_when_consumer_lag_less_than_zero_then_emit_event( ) -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient") +@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_partition_is_none_then_emit_warning_log( mock_generic_client, check, kafka_instance, dd_run_check, aggregator, caplog ): @@ -251,7 +193,7 @@ def test_when_partition_is_none_then_emit_warning_log( assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient") +@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_partition_not_in_partitions_then_emit_warning_log( mock_generic_client, check, kafka_instance, dd_run_check, aggregator, caplog ): @@ -294,7 +236,7 @@ def test_when_partition_not_in_partitions_then_emit_warning_log( assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient") +@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_highwater_metric_count_hit_context_limit_then_no_more_highwater_metrics( mock_generic_client, kafka_instance, dd_run_check, aggregator, caplog ): @@ -324,7 +266,7 @@ def test_when_highwater_metric_count_hit_context_limit_then_no_more_highwater_me assert expected_warning in caplog.text -@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.GenericKafkaClient") +@mock.patch("datadog_checks.kafka_consumer.kafka_consumer.KafkaClient") def test_when_consumer_metric_count_hit_context_limit_then_no_more_consumer_metrics( mock_generic_client, kafka_instance, dd_run_check, aggregator, caplog ):