From 5f8e380894409dc9761d0342d74a832d35533b65 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 15:23:02 -0500 Subject: [PATCH 01/12] Map out structure --- .../kafka_consumer/kafka_consumer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 73979e5488ddc..f6ef366018bb1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -60,6 +60,24 @@ def __init__(self, name, init_config, instances): def check(self, _): return self.sub_check.check() + # KafkaCheck + # - check() + # - create_kafka_client() - return KafkaClient + + # KafkaClient + # - get_consumer_offset_and_lag() + # - get_broker_offset() + # - report_consumer_offset_and_lag() + # - report_broker_offset() + + # KafkaPythonClient + # actual implementation + # "_report_highwater_offsets()" + + # ConfluentKafkaClient + # actual implementation + + def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" event_dict = { From 6d3fda803b203076f02401c91a0fb97321f3128b Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 15:31:40 -0500 Subject: [PATCH 02/12] Combine classes --- .../kafka_consumer/kafka_consumer.py | 27 +++---------------- 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index f6ef366018bb1..b604c6d01e7e1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -54,11 +54,8 @@ def __init__(self, name, init_config, instances): self.instance.get('monitor_all_broker_highwatermarks', False) ) self._consumer_groups = self.instance.get('consumer_groups', {}) - - self.sub_check = NewKafkaConsumerCheck(self) - - def check(self, _): - return self.sub_check.check() + self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) + self._kafka_client = None # KafkaCheck # - check() @@ -153,24 +150,6 @@ def _create_kafka_client(self, clazz): ) -class NewKafkaConsumerCheck(object): - """ - Check the offsets and lag of Kafka consumers. This check also returns broker highwater offsets. - - For details about the supported options, see the associated `conf.yaml.example`. - """ - - def __init__(self, parent_check): - self._parent_check = parent_check - self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) - self._kafka_client = None - - def __getattr__(self, item): - try: - return getattr(self._parent_check, item) - except AttributeError: - raise AttributeError("NewKafkaConsumerCheck has no attribute called {}".format(item)) - @property def kafka_client(self): if self._kafka_client is None: @@ -184,7 +163,7 @@ def kafka_client(self): self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) return self._kafka_client - def check(self): + def check(self, _): """The main entrypoint of the check.""" self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._highwater_offsets = {} # Expected format: {(topic, partition): offset} From 0971cf6ba84459550560077c0a0bceb09435d05d Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 15:34:15 -0500 Subject: [PATCH 03/12] Remove deprecated call --- kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py | 3 --- kafka_consumer/tests/test_kafka_consumer.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index b604c6d01e7e1..4f10b8478029e 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -88,9 +88,6 @@ def send_event(self, title, text, tags, event_type, aggregation_key, severity='i } self.event(event_dict) - def create_kafka_client(self): - return self._create_kafka_client(clazz=KafkaClient) - def create_kafka_admin_client(self): return self._create_kafka_client(clazz=KafkaAdminClient) diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 19d9fb0511125..c7f8507ab9ee5 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -49,7 +49,7 @@ def test_tls_config_ok(kafka_instance_tls): assert tls_context.check_hostname is True assert tls_context.tls_cert is not None assert tls_context.check_hostname is True - assert kafka_consumer_check.create_kafka_client is not None + assert kafka_consumer_check.create_kafka_admin_client is not None @pytest.mark.parametrize( From c3a7c898d8503e1217add614f7c7bbca2689ad99 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 15:37:01 -0500 Subject: [PATCH 04/12] Remove clazz --- .../datadog_checks/kafka_consumer/kafka_consumer.py | 10 ++++------ kafka_consumer/tests/test_kafka_consumer.py | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 4f10b8478029e..56cd25d529994 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -6,7 +6,7 @@ from time import time import six -from kafka import KafkaAdminClient, KafkaClient +from kafka import KafkaAdminClient from kafka import errors as kafka_errors from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import ListGroupsRequest @@ -74,7 +74,6 @@ def __init__(self, name, init_config, instances): # ConfluentKafkaClient # actual implementation - def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" event_dict = { @@ -89,7 +88,7 @@ def send_event(self, title, text, tags, event_type, aggregation_key, severity='i self.event(event_dict) def create_kafka_admin_client(self): - return self._create_kafka_client(clazz=KafkaAdminClient) + return self._create_kafka_client() def validate_consumer_groups(self): """Validate any explicitly specified consumer groups. @@ -108,7 +107,7 @@ def validate_consumer_groups(self): for partition in partitions: assert isinstance(partition, int) - def _create_kafka_client(self, clazz): + def _create_kafka_client(self): kafka_connect_str = self.instance.get('kafka_connect_str') if not isinstance(kafka_connect_str, (string_types, list)): raise ConfigurationError('kafka_connect_str should be string or list of strings') @@ -122,7 +121,7 @@ def _create_kafka_client(self, clazz): tls_context.load_verify_locations(crlfile) tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - return clazz( + return KafkaAdminClient( bootstrap_servers=kafka_connect_str, client_id='dd-agent', request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, @@ -146,7 +145,6 @@ def _create_kafka_client(self, clazz): ssl_context=tls_context, ) - @property def kafka_client(self): if self._kafka_client is None: diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index c7f8507ab9ee5..b885edd81b62e 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -34,7 +34,7 @@ def test_gssapi(kafka_instance, dd_run_check): @pytest.mark.unit def test_tls_config_ok(kafka_instance_tls): with mock.patch('datadog_checks.base.utils.tls.ssl') as ssl: - with mock.patch('kafka.KafkaClient') as kafka_client: + with mock.patch('kafka.KafkaAdminClient') as kafka_client: # mock Kafka Client kafka_client.return_value = mock.MagicMock() @@ -44,7 +44,7 @@ def test_tls_config_ok(kafka_instance_tls): ssl.SSLContext.return_value = tls_context kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance_tls]) - kafka_consumer_check._create_kafka_client(clazz=kafka_client) + kafka_consumer_check._create_kafka_client() assert tls_context.check_hostname is True assert tls_context.tls_cert is not None From 1ca6e60b69168db8c50892cea8d148878a5e0d8f Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 15:51:53 -0500 Subject: [PATCH 05/12] Create structure for kafka client classes --- .../client/confluent_kafka_client.py | 15 +++++++++++++++ .../kafka_consumer/client/kafka_client.py | 15 +++++++++++++++ .../kafka_consumer/client/kafka_python_client.py | 15 +++++++++++++++ .../kafka_consumer/kafka_consumer.py | 1 + 4 files changed, 46 insertions(+) create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py new file mode 100644 index 0000000000000..3a71ac383903e --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -0,0 +1,15 @@ +class ConfluentKafkaClient: + def __init__(self) -> None: + pass + + def get_consumer_offset_and_lag(): + pass + + def get_broker_offset(): + pass + + def report_consumer_offset_and_lag(): + pass + + def report_broker_offset(): + pass \ No newline at end of file diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py new file mode 100644 index 0000000000000..5bdaefca5f78c --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -0,0 +1,15 @@ +class KafkaClient: + def __init__(self) -> None: + pass + + def get_consumer_offset_and_lag(): + pass + + def get_broker_offset(): + pass + + def report_consumer_offset_and_lag(): + pass + + def report_broker_offset(): + pass \ No newline at end of file diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py new file mode 100644 index 0000000000000..a5ab53273f0d4 --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -0,0 +1,15 @@ +class KafkaPythonClient: + def __init__(self) -> None: + pass + + def get_consumer_offset_and_lag(): + pass + + def get_broker_offset(): + pass + + def report_consumer_offset_and_lag(): + pass + + def report_broker_offset(): + pass \ No newline at end of file diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 56cd25d529994..1d04e1bd3b016 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -56,6 +56,7 @@ def __init__(self, name, init_config, instances): self._consumer_groups = self.instance.get('consumer_groups', {}) self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) self._kafka_client = None + self.client = None # KafkaCheck # - check() From 3ceb5adfd65456eda2eb090007214dbe0c68ee50 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 16:03:57 -0500 Subject: [PATCH 06/12] Undo --- .../datadog_checks/kafka_consumer/kafka_consumer.py | 11 +++++++---- kafka_consumer/tests/test_kafka_consumer.py | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 1d04e1bd3b016..a8415c25df158 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -6,7 +6,7 @@ from time import time import six -from kafka import KafkaAdminClient +from kafka import KafkaAdminClient, KafkaClient from kafka import errors as kafka_errors from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import ListGroupsRequest @@ -88,8 +88,11 @@ def send_event(self, title, text, tags, event_type, aggregation_key, severity='i } self.event(event_dict) + def create_kafka_client(self): + return self._create_kafka_client(clazz=KafkaClient) + def create_kafka_admin_client(self): - return self._create_kafka_client() + return self._create_kafka_client(clazz=KafkaAdminClient) def validate_consumer_groups(self): """Validate any explicitly specified consumer groups. @@ -108,7 +111,7 @@ def validate_consumer_groups(self): for partition in partitions: assert isinstance(partition, int) - def _create_kafka_client(self): + def _create_kafka_client(self, clazz): kafka_connect_str = self.instance.get('kafka_connect_str') if not isinstance(kafka_connect_str, (string_types, list)): raise ConfigurationError('kafka_connect_str should be string or list of strings') @@ -122,7 +125,7 @@ def _create_kafka_client(self): tls_context.load_verify_locations(crlfile) tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - return KafkaAdminClient( + return clazz( bootstrap_servers=kafka_connect_str, client_id='dd-agent', request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index b885edd81b62e..19d9fb0511125 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -34,7 +34,7 @@ def test_gssapi(kafka_instance, dd_run_check): @pytest.mark.unit def test_tls_config_ok(kafka_instance_tls): with mock.patch('datadog_checks.base.utils.tls.ssl') as ssl: - with mock.patch('kafka.KafkaAdminClient') as kafka_client: + with mock.patch('kafka.KafkaClient') as kafka_client: # mock Kafka Client kafka_client.return_value = mock.MagicMock() @@ -44,12 +44,12 @@ def test_tls_config_ok(kafka_instance_tls): ssl.SSLContext.return_value = tls_context kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance_tls]) - kafka_consumer_check._create_kafka_client() + kafka_consumer_check._create_kafka_client(clazz=kafka_client) assert tls_context.check_hostname is True assert tls_context.tls_cert is not None assert tls_context.check_hostname is True - assert kafka_consumer_check.create_kafka_admin_client is not None + assert kafka_consumer_check.create_kafka_client is not None @pytest.mark.parametrize( From 2d1e363fa7f23aa9e5b16f326c633d0c6a6ef9de Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 10 Feb 2023 16:31:21 -0500 Subject: [PATCH 07/12] Fix style --- .../kafka_consumer/client/confluent_kafka_client.py | 10 +++++----- .../kafka_consumer/client/kafka_client.py | 10 +++++----- .../kafka_consumer/client/kafka_python_client.py | 10 +++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 3a71ac383903e..40f06821f0b4c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -2,14 +2,14 @@ class ConfluentKafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(): + def get_consumer_offset_and_lag(self): pass - def get_broker_offset(): + def get_broker_offset(self): pass - def report_consumer_offset_and_lag(): + def report_consumer_offset_and_lag(self): pass - def report_broker_offset(): - pass \ No newline at end of file + def report_broker_offset(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 5bdaefca5f78c..836db32fd3946 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -2,14 +2,14 @@ class KafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(): + def get_consumer_offset_and_lag(self): pass - def get_broker_offset(): + def get_broker_offset(self): pass - def report_consumer_offset_and_lag(): + def report_consumer_offset_and_lag(self): pass - def report_broker_offset(): - pass \ No newline at end of file + def report_broker_offset(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index a5ab53273f0d4..76d32d7125de6 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -2,14 +2,14 @@ class KafkaPythonClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(): + def get_consumer_offset_and_lag(self): pass - def get_broker_offset(): + def get_broker_offset(self): pass - def report_consumer_offset_and_lag(): + def report_consumer_offset_and_lag(self): pass - def report_broker_offset(): - pass \ No newline at end of file + def report_broker_offset(self): + pass From 88ba0e63b01886358f7d60e66bf4d0add02d3ece Mon Sep 17 00:00:00 2001 From: Fanny Jiang Date: Mon, 13 Feb 2023 13:09:28 -0500 Subject: [PATCH 08/12] Add consumer offset and log collection (#13944) --- .../client/confluent_kafka_client.py | 3 + .../kafka_consumer/client/kafka_client.py | 3 + .../client/kafka_python_client.py | 286 +++++++++++++++++- 3 files changed, 286 insertions(+), 6 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 40f06821f0b4c..b6f8dbf248cb5 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -1,3 +1,6 @@ +# (C) Datadog, Inc. 2023-present +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) class ConfluentKafkaClient: def __init__(self) -> None: pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 836db32fd3946..1398feebd71b1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -1,3 +1,6 @@ +# (C) Datadog, Inc. 2023-present +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) class KafkaClient: def __init__(self) -> None: pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index 76d32d7125de6..2398f805b4d61 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -1,15 +1,289 @@ +# (C) Datadog, Inc. 2023-present +# All rights reserved +# Licensed under Simplified BSD License (see LICENSE) +from collections import defaultdict +from time import time + +import six +from kafka.protocol.admin import ListGroupsRequest +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest +from kafka.structs import TopicPartition +from six import string_types + +from datadog_checks.base import ConfigurationError + + class KafkaPythonClient: - def __init__(self) -> None: - pass + def __init__(self, check) -> None: + self.check = check + self.log = check.log + self.kafka_client = check.kafka_client - def get_consumer_offset_and_lag(self): - pass + def get_consumer_offsets(self): + return self._get_consumer_offsets def get_broker_offset(self): pass - def report_consumer_offset_and_lag(self): - pass + def report_consumer_offsets_and_lag(self): + return self._report_consumer_offsets_and_lag def report_broker_offset(self): pass + + def _validate_consumer_groups(self): + """Validate any explicitly specified consumer groups. + + consumer_groups = {'consumer_group': {'topic': [0, 1]}} + """ + assert isinstance(self.check._consumer_groups, dict) + for consumer_group, topics in self.check._consumer_groups.items(): + assert isinstance(consumer_group, string_types) + assert isinstance(topics, dict) or topics is None # topics are optional + if topics is not None: + for topic, partitions in topics.items(): + assert isinstance(topic, string_types) + assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional + if partitions is not None: + for partition in partitions: + assert isinstance(partition, int) + + def _report_consumer_offsets_and_lag(self, contexts_limit): + """Report the consumer offsets and consumer lag.""" + reported_contexts = 0 + self.log.debug("Reporting consumer offsets and lag metrics") + for (consumer_group, topic, partition), consumer_offset in self.check._consumer_offsets.items(): + if reported_contexts >= contexts_limit: + self.log.debug( + "Reported contexts number %s greater than or equal to contexts limit of %s, returning", + str(reported_contexts), + str(contexts_limit), + ) + return + consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] + consumer_group_tags.extend(self.check._custom_tags) + + partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) + self.log.debug("Received partitions %s for topic %s", partitions, topic) + if partitions is not None and partition in partitions: + # report consumer offset if the partition is valid because even if leaderless the consumer offset will + # be valid once the leader failover completes + self.check.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) + reported_contexts += 1 + + if (topic, partition) not in self.check._highwater_offsets: + self.log.warning( + "Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset " + "(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.", + consumer_group, + topic, + partition, + ) + continue + producer_offset = self.check._highwater_offsets[(topic, partition)] + consumer_lag = producer_offset - consumer_offset + if reported_contexts < contexts_limit: + self.check.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) + reported_contexts += 1 + + if consumer_lag < 0: + # this will effectively result in data loss, so emit an event for max visibility + title = "Negative consumer lag for group: {}.".format(consumer_group) + message = ( + "Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never " + "happen and will result in the consumer skipping new messages until the lag turns " + "positive.".format(consumer_group, topic, partition) + ) + key = "{}:{}:{}".format(consumer_group, topic, partition) + self._send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") + self.log.debug(message) + else: + if partitions is None: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " + "in the cluster, so skipping reporting these offsets." + ) + else: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " + "included in the cluster partitions, so skipping reporting these offsets." + ) + self.log.warning(msg, consumer_group, topic, partition) + self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + + def _get_consumer_offsets(self): + """Fetch Consumer Group offsets from Kafka. + + Also fetch consumer_groups, topics, and partitions if not already specified. + + For speed, all the brokers are queried in parallel using callbacks. + The callback flow is: + A: When fetching all groups ('monitor_unlisted_consumer_groups' is True): + 1. Issue a ListGroupsRequest to every broker + 2. Attach a callback to each ListGroupsRequest that issues OffsetFetchRequests for every group. + Note: Because a broker only returns groups for which it is the coordinator, as an optimization we + skip the FindCoordinatorRequest + B: When fetching only listed groups: + 1. Issue a FindCoordintorRequest for each group + 2. Attach a callback to each FindCoordinatorResponse that issues OffsetFetchRequests for that group + Both: + 3. Attach a callback to each OffsetFetchRequest that parses the response + and saves the consumer group's offsets + """ + # Store the list of futures on the object because some of the callbacks create/store additional futures and they + # don't have access to variables scoped to this method, only to the object scope + self._consumer_futures = [] + + if self.check._monitor_unlisted_consumer_groups: + for broker in self.kafka_client._client.cluster.brokers(): + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. + list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) + list_groups_future.add_callback(self._list_groups_callback, broker.nodeId) + self._consumer_futures.append(list_groups_future) + elif self.check._consumer_groups: + self._validate_consumer_groups() + for consumer_group in self.check._consumer_groups: + find_coordinator_future = self._find_coordinator_id_send_request(consumer_group) + find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group) + self._consumer_futures.append(find_coordinator_future) + else: + raise ConfigurationError( + "Cannot fetch consumer offsets because no consumer_groups are specified and " + "monitor_unlisted_consumer_groups is %s." % self.check._monitor_unlisted_consumer_groups + ) + + # Loop until all futures resolved. + self.kafka_client._wait_for_futures(self._consumer_futures) + del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs + + def _list_groups_callback(self, broker_id, response): + """Callback that takes a ListGroupsResponse and issues an OffsetFetchRequest for each group. + + broker_id must be manually passed in because it is not present in the response. Keeping track of the broker that + gave us this response lets us skip issuing FindCoordinatorRequests because Kafka brokers only include + consumer groups in their ListGroupsResponse when they are the coordinator for that group. + """ + for consumer_group, group_type in self.kafka_client._list_consumer_groups_process_response(response): + # consumer groups from Kafka < 0.9 that store their offset in Kafka don't use Kafka for group-coordination + # so their group_type is empty + if group_type in ('consumer', ''): + single_group_offsets_future = self._list_consumer_group_offsets_send_request( + group_id=consumer_group, group_coordinator_id=broker_id + ) + single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group) + self._consumer_futures.append(single_group_offsets_future) + + def _find_coordinator_callback(self, consumer_group, response): + """Callback that takes a FindCoordinatorResponse and issues an OffsetFetchRequest for the group. + + consumer_group must be manually passed in because it is not present in the response, but we need it in order to + associate these offsets to the proper consumer group. + + The OffsetFetchRequest is scoped to the topics and partitions that are specified in the check config. If + topics are unspecified, it will fetch all known offsets for that consumer group. Similarly, if the partitions + are unspecified for a topic listed in the config, offsets are fetched for all the partitions within that topic. + """ + coordinator_id = self.kafka_client._find_coordinator_id_process_response(response) + topics = self.check._consumer_groups[consumer_group] + if not topics: + topic_partitions = None # None signals to fetch all known offsets for the consumer group + else: + # transform [("t1", [1, 2])] into [TopicPartition("t1", 1), TopicPartition("t1", 2)] + topic_partitions = [] + for topic, partitions in topics.items(): + if not partitions: # If partitions aren't specified, fetch all partitions in the topic + partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) + topic_partitions.extend([TopicPartition(topic, p) for p in partitions]) + single_group_offsets_future = self._list_consumer_group_offsets_send_request( + group_id=consumer_group, group_coordinator_id=coordinator_id, partitions=topic_partitions + ) + single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group) + self._consumer_futures.append(single_group_offsets_future) + + def _single_group_offsets_callback(self, consumer_group, response): + """Callback that parses an OffsetFetchResponse and saves it to the consumer_offsets dict. + + consumer_group must be manually passed in because it is not present in the response, but we need it in order to + associate these offsets to the proper consumer group. + """ + single_group_offsets = self.kafka_client._list_consumer_group_offsets_process_response(response) + self.log.debug("Single group offsets: %s", single_group_offsets) + for (topic, partition), (offset, _metadata) in single_group_offsets.items(): + # If the OffsetFetchRequest explicitly specified partitions, the offset could returned as -1, meaning there + # is no recorded offset for that partition... for example, if the partition doesn't exist in the cluster. + # So ignore it. + if offset == -1: + self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + continue + key = (consumer_group, topic, partition) + self.check._consumer_offsets[key] = offset + + def _list_consumer_groups_send_request(self, broker_id): + kafka_version = self.kafka_client._matching_api_version(ListGroupsRequest) + if kafka_version <= 2: + request = ListGroupsRequest[kafka_version]() + else: + raise NotImplementedError( + "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version) + ) + # Disable wakeup when sending request to prevent blocking send requests + return self.check._send_request_to_node(broker_id, request, wakeup=False) + + def _find_coordinator_id_send_request(self, group_id): + """Send a FindCoordinatorRequest to a broker. + :param group_id: The consumer group ID. This is typically the group + name as a string. + :return: A message future + """ + version = 0 + request = GroupCoordinatorRequest[version](group_id) + return self.check._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) + + def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None): + """Send an OffsetFetchRequest to a broker. + :param group_id: The consumer group id name for which to fetch offsets. + :param group_coordinator_id: The node_id of the group's coordinator + broker. + :return: A message future + """ + version = self.kafka_client._matching_api_version(OffsetFetchRequest) + if version <= 3: + if partitions is None: + if version <= 1: + raise ValueError( + """OffsetFetchRequest_v{} requires specifying the + partitions for which to fetch offsets. Omitting the + partitions is only supported on brokers >= 0.10.2. + For details, see KIP-88.""".format( + version + ) + ) + topics_partitions = None + else: + # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] + topics_partitions_dict = defaultdict(set) + for topic, partition in partitions: + topics_partitions_dict[topic].add(partition) + topics_partitions = list(six.iteritems(topics_partitions_dict)) + request = OffsetFetchRequest[version](group_id, topics_partitions) + else: + raise NotImplementedError( + "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version) + ) + return self.check._send_request_to_node(group_coordinator_id, request, wakeup=False) + + def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): + """Emit an event to the Datadog Event Stream.""" + event_dict = { + 'timestamp': int(time()), + 'msg_title': title, + 'event_type': event_type, + 'alert_type': severity, + 'msg_text': text, + 'tags': tags, + 'aggregation_key': aggregation_key, + } + self.check.event(event_dict) From 64b35889a41c7417d62f8e100ef2b6bf365efa98 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Mon, 13 Feb 2023 13:35:33 -0500 Subject: [PATCH 09/12] Refactor broker offset metric collection (#13934) * Add broker offset metric collection * Change import * Clean up broker offset functions and change names * Fix style * Use updated values for check * Clean up functions --- .../client/confluent_kafka_client.py | 5 +- .../kafka_consumer/client/kafka_client.py | 5 +- .../client/kafka_python_client.py | 163 ++++++++++++++++-- 3 files changed, 161 insertions(+), 12 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index b6f8dbf248cb5..bd990141608c6 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -5,7 +5,7 @@ class ConfluentKafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(self): + def get_consumer_offsets(self): pass def get_broker_offset(self): @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self): def report_broker_offset(self): pass + + def collect_broker_metadata(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 1398feebd71b1..6ad5525f1b150 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -5,7 +5,7 @@ class KafkaClient: def __init__(self) -> None: pass - def get_consumer_offset_and_lag(self): + def get_consumer_offsets(self): pass def get_broker_offset(self): @@ -16,3 +16,6 @@ def report_consumer_offset_and_lag(self): def report_broker_offset(self): pass + + def collect_broker_metadata(self): + pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index 2398f805b4d61..0f2299bff4d9b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -4,13 +4,15 @@ from collections import defaultdict from time import time -import six +from kafka import errors as kafka_errors from kafka.protocol.admin import ListGroupsRequest from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest +from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition -from six import string_types +from six import iteritems, string_types -from datadog_checks.base import ConfigurationError +from datadog_checks.base import AgentCheck, ConfigurationError +from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS class KafkaPythonClient: @@ -19,17 +21,158 @@ def __init__(self, check) -> None: self.log = check.log self.kafka_client = check.kafka_client + @AgentCheck.metadata_entrypoint + def collect_broker_metadata(self): + return self._collect_broker_metadata + + def _collect_broker_metadata(self): + version_data = [str(part) for part in self.kafka_client._client.check_version()] + version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} + + self.set_metadata( + 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts + ) + def get_consumer_offsets(self): return self._get_consumer_offsets def get_broker_offset(self): - pass + return self._get_broker_offset + + def _get_broker_offset(self): + """Fetch highwater offsets for topic_partitions in the Kafka cluster. + + Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether + producers are successfully producing. + + If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater + mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer + offset. + + Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded. + + Any partitions that don't currently have a leader will be skipped. + + Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: + https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) + + For speed, all the brokers are queried in parallel using callbacks. The callback flow is: + 1. Issue an OffsetRequest to every broker + 2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets. + """ + highwater_futures = [] # No need to store on object because the callbacks don't create additional futures + + # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for + # which this run of the check has at least once saved consumer offset. This is later used as a filter for + # excluding partitions. + if not self.check._monitor_all_broker_highwatermarks: + tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets} + + for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size): + for broker in batch: + broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId) + if broker_led_partitions is None: + continue + + # Take the partitions for which this broker is the leader and group them by topic in order to construct + # the OffsetRequest while simultaneously filtering out partitions we want to exclude + partitions_grouped_by_topic = defaultdict(list) + for topic, partition in broker_led_partitions: + # No sense fetching highwater offsets for internal topics + if topic not in KAFKA_INTERNAL_TOPICS and ( + self.check._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset + ): + partitions_grouped_by_topic[topic].append(partition) + + # Construct the OffsetRequest + max_offsets = 1 + request = OffsetRequest[0]( + replica_id=-1, + topics=[ + (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) + for topic, partitions in partitions_grouped_by_topic.items() + ], + ) + + # We can disable wakeup here because it is the same thread doing both polling and sending. Also, it + # is possible that the wakeup itself could block if a large number of sends were processed beforehand. + highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False) + + highwater_future.add_callback(self._highwater_offsets_callback) + highwater_futures.append(highwater_future) + + # Loop until all futures resolved. + self.kafka_client._wait_for_futures(highwater_futures) + + def _highwater_offsets_callback(self, response): + """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" + if type(response) not in OffsetResponse: + raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response)) + for topic, partitions_data in response.topics: + for partition, error_code, offsets in partitions_data: + error_type = kafka_errors.for_code(error_code) + if error_type is kafka_errors.NoError: + self.check._highwater_offsets[(topic, partition)] = offsets[0] + elif error_type is kafka_errors.NotLeaderForPartitionError: + self.log.warning( + "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " + "if the broker that was the partition leader when kafka_admin_client last fetched metadata is " + "no longer the leader.", + error_type.message, + error_type.errno, + topic, + partition, + ) + self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + elif error_type is kafka_errors.UnknownTopicOrPartitionError: + self.log.warning( + "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only " + "happen if the topic is currently being deleted or the check configuration lists non-existent " + "topic partitions.", + error_type.message, + error_type.errno, + topic, + partition, + ) + else: + raise error_type( + "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, " + "partition: %s." % (topic, partition) + ) + + @staticmethod + def batchify(iterable, batch_size): + iterable = list(iterable) + return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size)) + + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. + def _send_request_to_node(self, node_id, request, wakeup=True): + while not self.kafka_client._client.ready(node_id): + # poll until the connection to broker is ready, otherwise send() + # will fail with NodeNotReadyError + self.kafka_client._client.poll() + return self.kafka_client._client.send(node_id, request, wakeup=wakeup) def report_consumer_offsets_and_lag(self): return self._report_consumer_offsets_and_lag - def report_broker_offset(self): - pass + def report_broker_offset(self, contexts_limit): + return self._report_broker_offset + + def _report_broker_offset(self, contexts_limit): + """Report the broker highwater offsets.""" + reported_contexts = 0 + self.log.debug("Reporting broker offset metric") + for (topic, partition), highwater_offset in self._highwater_offsets.items(): + broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] + broker_tags.extend(self.check._custom_tags) + self.check.gauge('broker_offset', highwater_offset, tags=broker_tags) + reported_contexts += 1 + if reported_contexts == contexts_limit: + return def _validate_consumer_groups(self): """Validate any explicitly specified consumer groups. @@ -230,7 +373,7 @@ def _list_consumer_groups_send_request(self, broker_id): "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version) ) # Disable wakeup when sending request to prevent blocking send requests - return self.check._send_request_to_node(broker_id, request, wakeup=False) + return self._send_request_to_node(broker_id, request, wakeup=False) def _find_coordinator_id_send_request(self, group_id): """Send a FindCoordinatorRequest to a broker. @@ -240,7 +383,7 @@ def _find_coordinator_id_send_request(self, group_id): """ version = 0 request = GroupCoordinatorRequest[version](group_id) - return self.check._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) + return self._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None): """Send an OffsetFetchRequest to a broker. @@ -267,13 +410,13 @@ def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_ topics_partitions_dict = defaultdict(set) for topic, partition in partitions: topics_partitions_dict[topic].add(partition) - topics_partitions = list(six.iteritems(topics_partitions_dict)) + topics_partitions = list(iteritems(topics_partitions_dict)) request = OffsetFetchRequest[version](group_id, topics_partitions) else: raise NotImplementedError( "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version) ) - return self.check._send_request_to_node(group_coordinator_id, request, wakeup=False) + return self._send_request_to_node(group_coordinator_id, request, wakeup=False) def _send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" From 444e3845726c7dec2c2e6d1cd0aff88e68f44873 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Mon, 13 Feb 2023 16:08:55 -0500 Subject: [PATCH 10/12] Refactor client creation (#13946) * Refactor client creation * Add back e2e test * Remove commented out line --- .../kafka_consumer/client/kafka_client.py | 28 +- .../client/kafka_client_factory.py | 6 + .../client/kafka_python_client.py | 366 ++++++++------ .../kafka_consumer/kafka_consumer.py | 453 +----------------- 4 files changed, 263 insertions(+), 590 deletions(-) create mode 100644 kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 6ad5525f1b150..247bb54bd46c1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -1,21 +1,37 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) -class KafkaClient: - def __init__(self) -> None: - pass +from abc import ABC, abstractmethod + + +class KafkaClient(ABC): + def __init__(self, check) -> None: + self.check = check + self.log = check.log + self._kafka_client = None + self._highwater_offsets = {} + self._consumer_offsets = {} + self._context_limit = check._context_limit + + def should_get_highwater_offsets(self): + return len(self._consumer_offsets) < self._context_limit + @abstractmethod def get_consumer_offsets(self): pass - def get_broker_offset(self): + @abstractmethod + def get_highwater_offsets(self): pass - def report_consumer_offset_and_lag(self): + @abstractmethod + def report_consumer_offsets_and_lag(self): pass - def report_broker_offset(self): + @abstractmethod + def report_highwater_offsets(self): pass + @abstractmethod def collect_broker_metadata(self): pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py new file mode 100644 index 0000000000000..d2c7912f3a89b --- /dev/null +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client_factory.py @@ -0,0 +1,6 @@ +from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient +from datadog_checks.kafka_consumer.client.kafka_python_client import KafkaPythonClient + + +def make_client(check) -> KafkaClient: + return KafkaPythonClient(check) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index 0f2299bff4d9b..3837181a67dca 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -1,45 +1,155 @@ # (C) Datadog, Inc. 2023-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import ssl from collections import defaultdict from time import time +from kafka import KafkaAdminClient from kafka import errors as kafka_errors +from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import ListGroupsRequest from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse from kafka.structs import TopicPartition from six import iteritems, string_types -from datadog_checks.base import AgentCheck, ConfigurationError -from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS +from datadog_checks.base import ConfigurationError +from datadog_checks.base.utils.http import AuthTokenOAuthReader +from datadog_checks.kafka_consumer.client.kafka_client import KafkaClient +from datadog_checks.kafka_consumer.constants import DEFAULT_KAFKA_TIMEOUT, KAFKA_INTERNAL_TOPICS -class KafkaPythonClient: +class OAuthTokenProvider(AbstractTokenProvider): + def __init__(self, **config): + self.reader = AuthTokenOAuthReader(config) + + def token(self): + # Read only if necessary or use cached token + return self.reader.read() or self.reader._token + + +class KafkaPythonClient(KafkaClient): def __init__(self, check) -> None: self.check = check self.log = check.log - self.kafka_client = check.kafka_client + self._kafka_client = None + self._highwater_offsets = {} + self._consumer_offsets = {} + self._context_limit = check._context_limit - @AgentCheck.metadata_entrypoint - def collect_broker_metadata(self): - return self._collect_broker_metadata + def get_consumer_offsets(self): + """Fetch Consumer Group offsets from Kafka. - def _collect_broker_metadata(self): - version_data = [str(part) for part in self.kafka_client._client.check_version()] - version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} + Also fetch consumer_groups, topics, and partitions if not already specified. - self.set_metadata( - 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts - ) + For speed, all the brokers are queried in parallel using callbacks. + The callback flow is: + A: When fetching all groups ('monitor_unlisted_consumer_groups' is True): + 1. Issue a ListGroupsRequest to every broker + 2. Attach a callback to each ListGroupsRequest that issues OffsetFetchRequests for every group. + Note: Because a broker only returns groups for which it is the coordinator, as an optimization we + skip the FindCoordinatorRequest + B: When fetching only listed groups: + 1. Issue a FindCoordintorRequest for each group + 2. Attach a callback to each FindCoordinatorResponse that issues OffsetFetchRequests for that group + Both: + 3. Attach a callback to each OffsetFetchRequest that parses the response + and saves the consumer group's offsets + """ + # Store the list of futures on the object because some of the callbacks create/store additional futures and they + # don't have access to variables scoped to this method, only to the object scope + self._consumer_futures = [] - def get_consumer_offsets(self): - return self._get_consumer_offsets + if self.check._monitor_unlisted_consumer_groups: + for broker in self.kafka_client._client.cluster.brokers(): + # FIXME: This is using a workaround to skip socket wakeup, which causes blocking + # (see https://github.com/dpkp/kafka-python/issues/2286). + # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official + # implementation for this function instead. + list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) + list_groups_future.add_callback(self._list_groups_callback, broker.nodeId) + self._consumer_futures.append(list_groups_future) + elif self.check._consumer_groups: + self._validate_consumer_groups() + for consumer_group in self.check._consumer_groups: + find_coordinator_future = self._find_coordinator_id_send_request(consumer_group) + find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group) + self._consumer_futures.append(find_coordinator_future) + else: + raise ConfigurationError( + "Cannot fetch consumer offsets because no consumer_groups are specified and " + "monitor_unlisted_consumer_groups is %s." % self.check._monitor_unlisted_consumer_groups + ) + + # Loop until all futures resolved. + self.kafka_client._wait_for_futures(self._consumer_futures) + del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs + + def report_consumer_offsets_and_lag(self, contexts_limit): + """Report the consumer offsets and consumer lag.""" + reported_contexts = 0 + self.log.debug("Reporting consumer offsets and lag metrics") + for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items(): + if reported_contexts >= contexts_limit: + self.log.debug( + "Reported contexts number %s greater than or equal to contexts limit of %s, returning", + str(reported_contexts), + str(contexts_limit), + ) + return + consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] + consumer_group_tags.extend(self.check._custom_tags) + + partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) + self.log.debug("Received partitions %s for topic %s", partitions, topic) + if partitions is not None and partition in partitions: + # report consumer offset if the partition is valid because even if leaderless the consumer offset will + # be valid once the leader failover completes + self.check.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) + reported_contexts += 1 - def get_broker_offset(self): - return self._get_broker_offset + if (topic, partition) not in self._highwater_offsets: + self.log.warning( + "Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset " + "(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.", + consumer_group, + topic, + partition, + ) + continue + producer_offset = self._highwater_offsets[(topic, partition)] + consumer_lag = producer_offset - consumer_offset + if reported_contexts < contexts_limit: + self.check.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) + reported_contexts += 1 - def _get_broker_offset(self): + if consumer_lag < 0: + # this will effectively result in data loss, so emit an event for max visibility + title = "Negative consumer lag for group: {}.".format(consumer_group) + message = ( + "Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never " + "happen and will result in the consumer skipping new messages until the lag turns " + "positive.".format(consumer_group, topic, partition) + ) + key = "{}:{}:{}".format(consumer_group, topic, partition) + self._send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") + self.log.debug(message) + else: + if partitions is None: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " + "in the cluster, so skipping reporting these offsets." + ) + else: + msg = ( + "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " + "included in the cluster partitions, so skipping reporting these offsets." + ) + self.log.warning(msg, consumer_group, topic, partition) + self.kafka_client._client.cluster.request_update() # force metadata update on next poll() + + def get_highwater_offsets(self): """Fetch highwater offsets for topic_partitions in the Kafka cluster. Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether @@ -66,7 +176,7 @@ def _get_broker_offset(self): # which this run of the check has at least once saved consumer offset. This is later used as a filter for # excluding partitions. if not self.check._monitor_all_broker_highwatermarks: - tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self.check._consumer_offsets} + tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self._consumer_offsets} for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self.check._broker_requests_batch_size): for broker in batch: @@ -104,6 +214,91 @@ def _get_broker_offset(self): # Loop until all futures resolved. self.kafka_client._wait_for_futures(highwater_futures) + def report_highwater_offsets(self, contexts_limit): + """Report the broker highwater offsets.""" + reported_contexts = 0 + self.log.debug("Reporting broker offset metric") + for (topic, partition), highwater_offset in self._highwater_offsets.items(): + broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] + broker_tags.extend(self.check._custom_tags) + self.check.gauge('broker_offset', highwater_offset, tags=broker_tags) + reported_contexts += 1 + if reported_contexts == contexts_limit: + return + + def create_kafka_admin_client(self): + return self._create_kafka_client(clazz=KafkaAdminClient) + + def _create_kafka_admin_client(self, api_version): + """Return a KafkaAdminClient.""" + # TODO accept None (which inherits kafka-python default of localhost:9092) + kafka_admin_client = self.create_kafka_admin_client() + self.log.debug("KafkaAdminClient api_version: %s", kafka_admin_client.config['api_version']) + # Force initial population of the local cluster metadata cache + kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update()) + if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None: + raise RuntimeError("Local cluster metadata cache did not populate.") + return kafka_admin_client + + def _create_kafka_client(self, clazz): + kafka_connect_str = self.check.instance.get('kafka_connect_str') + if not isinstance(kafka_connect_str, (string_types, list)): + raise ConfigurationError('kafka_connect_str should be string or list of strings') + kafka_version = self.check.instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) + + tls_context = self.check.get_tls_context() + crlfile = self.check.instance.get('ssl_crlfile', self.check.instance.get('tls_crlfile')) + if crlfile: + tls_context.load_verify_locations(crlfile) + tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF + + return clazz( + bootstrap_servers=kafka_connect_str, + client_id='dd-agent', + request_timeout_ms=self.check.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, + # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for + # broker version during the bootstrapping process. Note that this returns the first version found, so in + # a mixed-version cluster this will be a non-deterministic result. + api_version=kafka_version, + # While we check for SASL/SSL params, if not present they will default to the kafka-python values for + # plaintext connections + security_protocol=self.check.instance.get('security_protocol', 'PLAINTEXT'), + sasl_mechanism=self.check.instance.get('sasl_mechanism'), + sasl_plain_username=self.check.instance.get('sasl_plain_username'), + sasl_plain_password=self.check.instance.get('sasl_plain_password'), + sasl_kerberos_service_name=self.check.instance.get('sasl_kerberos_service_name', 'kafka'), + sasl_kerberos_domain_name=self.check.instance.get('sasl_kerberos_domain_name'), + sasl_oauth_token_provider=( + OAuthTokenProvider(**self.check.instance['sasl_oauth_token_provider']) + if 'sasl_oauth_token_provider' in self.check.instance + else None + ), + ssl_context=tls_context, + ) + + @property + def kafka_client(self): + if self._kafka_client is None: + # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for + # broker version during the bootstrapping process. Note that this returns the first version found, so in + # a mixed-version cluster this will be a non-deterministic result. + kafka_version = self.check.instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) + + self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) + return self._kafka_client + + def collect_broker_metadata(self): + version_data = [str(part) for part in self.kafka_client._client.check_version()] + version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} + + self.check.set_metadata( + 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts + ) + def _highwater_offsets_callback(self, response): """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" if type(response) not in OffsetResponse: @@ -112,7 +307,7 @@ def _highwater_offsets_callback(self, response): for partition, error_code, offsets in partitions_data: error_type = kafka_errors.for_code(error_code) if error_type is kafka_errors.NoError: - self.check._highwater_offsets[(topic, partition)] = offsets[0] + self._highwater_offsets[(topic, partition)] = offsets[0] elif error_type is kafka_errors.NotLeaderForPartitionError: self.log.warning( "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " @@ -156,24 +351,6 @@ def _send_request_to_node(self, node_id, request, wakeup=True): self.kafka_client._client.poll() return self.kafka_client._client.send(node_id, request, wakeup=wakeup) - def report_consumer_offsets_and_lag(self): - return self._report_consumer_offsets_and_lag - - def report_broker_offset(self, contexts_limit): - return self._report_broker_offset - - def _report_broker_offset(self, contexts_limit): - """Report the broker highwater offsets.""" - reported_contexts = 0 - self.log.debug("Reporting broker offset metric") - for (topic, partition), highwater_offset in self._highwater_offsets.items(): - broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] - broker_tags.extend(self.check._custom_tags) - self.check.gauge('broker_offset', highwater_offset, tags=broker_tags) - reported_contexts += 1 - if reported_contexts == contexts_limit: - return - def _validate_consumer_groups(self): """Validate any explicitly specified consumer groups. @@ -191,117 +368,6 @@ def _validate_consumer_groups(self): for partition in partitions: assert isinstance(partition, int) - def _report_consumer_offsets_and_lag(self, contexts_limit): - """Report the consumer offsets and consumer lag.""" - reported_contexts = 0 - self.log.debug("Reporting consumer offsets and lag metrics") - for (consumer_group, topic, partition), consumer_offset in self.check._consumer_offsets.items(): - if reported_contexts >= contexts_limit: - self.log.debug( - "Reported contexts number %s greater than or equal to contexts limit of %s, returning", - str(reported_contexts), - str(contexts_limit), - ) - return - consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] - consumer_group_tags.extend(self.check._custom_tags) - - partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) - self.log.debug("Received partitions %s for topic %s", partitions, topic) - if partitions is not None and partition in partitions: - # report consumer offset if the partition is valid because even if leaderless the consumer offset will - # be valid once the leader failover completes - self.check.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) - reported_contexts += 1 - - if (topic, partition) not in self.check._highwater_offsets: - self.log.warning( - "Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset " - "(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.", - consumer_group, - topic, - partition, - ) - continue - producer_offset = self.check._highwater_offsets[(topic, partition)] - consumer_lag = producer_offset - consumer_offset - if reported_contexts < contexts_limit: - self.check.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) - reported_contexts += 1 - - if consumer_lag < 0: - # this will effectively result in data loss, so emit an event for max visibility - title = "Negative consumer lag for group: {}.".format(consumer_group) - message = ( - "Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never " - "happen and will result in the consumer skipping new messages until the lag turns " - "positive.".format(consumer_group, topic, partition) - ) - key = "{}:{}:{}".format(consumer_group, topic, partition) - self._send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") - self.log.debug(message) - else: - if partitions is None: - msg = ( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " - "in the cluster, so skipping reporting these offsets." - ) - else: - msg = ( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " - "included in the cluster partitions, so skipping reporting these offsets." - ) - self.log.warning(msg, consumer_group, topic, partition) - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - - def _get_consumer_offsets(self): - """Fetch Consumer Group offsets from Kafka. - - Also fetch consumer_groups, topics, and partitions if not already specified. - - For speed, all the brokers are queried in parallel using callbacks. - The callback flow is: - A: When fetching all groups ('monitor_unlisted_consumer_groups' is True): - 1. Issue a ListGroupsRequest to every broker - 2. Attach a callback to each ListGroupsRequest that issues OffsetFetchRequests for every group. - Note: Because a broker only returns groups for which it is the coordinator, as an optimization we - skip the FindCoordinatorRequest - B: When fetching only listed groups: - 1. Issue a FindCoordintorRequest for each group - 2. Attach a callback to each FindCoordinatorResponse that issues OffsetFetchRequests for that group - Both: - 3. Attach a callback to each OffsetFetchRequest that parses the response - and saves the consumer group's offsets - """ - # Store the list of futures on the object because some of the callbacks create/store additional futures and they - # don't have access to variables scoped to this method, only to the object scope - self._consumer_futures = [] - - if self.check._monitor_unlisted_consumer_groups: - for broker in self.kafka_client._client.cluster.brokers(): - # FIXME: This is using a workaround to skip socket wakeup, which causes blocking - # (see https://github.com/dpkp/kafka-python/issues/2286). - # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official - # implementation for this function instead. - list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) - list_groups_future.add_callback(self._list_groups_callback, broker.nodeId) - self._consumer_futures.append(list_groups_future) - elif self.check._consumer_groups: - self._validate_consumer_groups() - for consumer_group in self.check._consumer_groups: - find_coordinator_future = self._find_coordinator_id_send_request(consumer_group) - find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group) - self._consumer_futures.append(find_coordinator_future) - else: - raise ConfigurationError( - "Cannot fetch consumer offsets because no consumer_groups are specified and " - "monitor_unlisted_consumer_groups is %s." % self.check._monitor_unlisted_consumer_groups - ) - - # Loop until all futures resolved. - self.kafka_client._wait_for_futures(self._consumer_futures) - del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs - def _list_groups_callback(self, broker_id, response): """Callback that takes a ListGroupsResponse and issues an OffsetFetchRequest for each group. @@ -362,7 +428,7 @@ def _single_group_offsets_callback(self, consumer_group, response): self.kafka_client._client.cluster.request_update() # force metadata update on next poll() continue key = (consumer_group, topic, partition) - self.check._consumer_offsets[key] = offset + self._consumer_offsets[key] = offset def _list_consumer_groups_send_request(self, broker_id): kafka_version = self.kafka_client._matching_api_version(ListGroupsRequest) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index a8415c25df158..18be6209951c1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -2,23 +2,16 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import ssl -from collections import defaultdict -from time import time -import six from kafka import KafkaAdminClient, KafkaClient -from kafka import errors as kafka_errors from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import ListGroupsRequest -from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest -from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy, OffsetResponse -from kafka.structs import TopicPartition from six import string_types from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative from datadog_checks.base.utils.http import AuthTokenOAuthReader +from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client -from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT, KAFKA_INTERNAL_TOPICS +from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT class OAuthTokenProvider(AbstractTokenProvider): @@ -55,62 +48,14 @@ def __init__(self, name, init_config, instances): ) self._consumer_groups = self.instance.get('consumer_groups', {}) self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) - self._kafka_client = None self.client = None - # KafkaCheck - # - check() - # - create_kafka_client() - return KafkaClient - - # KafkaClient - # - get_consumer_offset_and_lag() - # - get_broker_offset() - # - report_consumer_offset_and_lag() - # - report_broker_offset() - - # KafkaPythonClient - # actual implementation - # "_report_highwater_offsets()" - - # ConfluentKafkaClient - # actual implementation - - def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): - """Emit an event to the Datadog Event Stream.""" - event_dict = { - 'timestamp': int(time()), - 'msg_title': title, - 'event_type': event_type, - 'alert_type': severity, - 'msg_text': text, - 'tags': tags, - 'aggregation_key': aggregation_key, - } - self.event(event_dict) - def create_kafka_client(self): return self._create_kafka_client(clazz=KafkaClient) def create_kafka_admin_client(self): return self._create_kafka_client(clazz=KafkaAdminClient) - def validate_consumer_groups(self): - """Validate any explicitly specified consumer groups. - - consumer_groups = {'consumer_group': {'topic': [0, 1]}} - """ - assert isinstance(self._consumer_groups, dict) - for consumer_group, topics in self._consumer_groups.items(): - assert isinstance(consumer_group, string_types) - assert isinstance(topics, dict) or topics is None # topics are optional - if topics is not None: - for topic, partitions in topics.items(): - assert isinstance(topic, string_types) - assert isinstance(partitions, (list, tuple)) or partitions is None # partitions are optional - if partitions is not None: - for partition in partitions: - assert isinstance(partition, int) - def _create_kafka_client(self, clazz): kafka_connect_str = self.instance.get('kafka_connect_str') if not isinstance(kafka_connect_str, (string_types, list)): @@ -166,24 +111,19 @@ def check(self, _): """The main entrypoint of the check.""" self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._highwater_offsets = {} # Expected format: {(topic, partition): offset} - - # For calculating consumer lag, we have to fetch both the consumer offset and the broker highwater offset. - # There's a potential race condition because whichever one we check first may be outdated by the time we check - # the other. Better to check consumer offsets before checking broker offsets because worst case is that - # overstates consumer lag a little. Doing it the other way can understate consumer lag to the point of having - # negative consumer lag, which just creates confusion because it's theoretically impossible. + self.client = make_client(self) # Fetch Kafka consumer offsets try: - self._get_consumer_offsets() + self.client.get_consumer_offsets() except Exception: self.log.exception("There was a problem collecting consumer offsets from Kafka.") # don't raise because we might get valid broker offsets # Fetch the broker highwater offsets try: - if len(self._consumer_offsets) < self._context_limit: - self._get_highwater_offsets() + if self.client.should_get_highwater_offsets(): + self.client.get_highwater_offsets() else: self.warning("Context limit reached. Skipping highwater offset collection.") except Exception: @@ -191,7 +131,7 @@ def check(self, _): # Unlike consumer offsets, fail immediately because we can't calculate consumer lag w/o highwater_offsets raise - total_contexts = len(self._consumer_offsets) + len(self._highwater_offsets) + total_contexts = len(self.client._consumer_offsets) + len(self.client._highwater_offsets) if total_contexts >= self._context_limit: self.warning( """Discovered %s metric contexts - this exceeds the maximum number of %s contexts permitted by the @@ -202,10 +142,14 @@ def check(self, _): ) # Report the metrics - self._report_highwater_offsets(self._context_limit) - self._report_consumer_offsets_and_lag(self._context_limit - len(self._highwater_offsets)) + self.client.report_highwater_offsets(self._context_limit) + self.client.report_consumer_offsets_and_lag(self._context_limit - len(self.client._highwater_offsets)) + + self.collect_broker_metadata() - self._collect_broker_metadata() + @AgentCheck.metadata_entrypoint + def collect_broker_metadata(self): + self.client.collect_broker_metadata() def _create_kafka_admin_client(self, api_version): """Return a KafkaAdminClient.""" @@ -218,367 +162,8 @@ def _create_kafka_admin_client(self, api_version): raise RuntimeError("Local cluster metadata cache did not populate.") return kafka_admin_client - def _get_highwater_offsets(self): - """Fetch highwater offsets for topic_partitions in the Kafka cluster. - - Do this for all partitions in the cluster because even if it has no consumers, we may want to measure whether - producers are successfully producing. - - If monitor_all_broker_highwatermarks is True, will fetch for all partitions in the cluster. Otherwise highwater - mark offsets will only be fetched for topic partitions where this check run has already fetched a consumer - offset. - - Internal Kafka topics like __consumer_offsets, __transaction_state, etc are always excluded. - - Any partitions that don't currently have a leader will be skipped. - - Sends one OffsetRequest per broker to get offsets for all partitions where that broker is the leader: - https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset) - - For speed, all the brokers are queried in parallel using callbacks. The callback flow is: - 1. Issue an OffsetRequest to every broker - 2. Attach a callback to each OffsetResponse that parses the response and saves the highwater offsets. - """ - highwater_futures = [] # No need to store on object because the callbacks don't create additional futures - - # If we aren't fetching all broker highwater offsets, then construct the unique set of topic partitions for - # which this run of the check has at least once saved consumer offset. This is later used as a filter for - # excluding partitions. - if not self._monitor_all_broker_highwatermarks: - tps_with_consumer_offset = {(topic, partition) for (_, topic, partition) in self._consumer_offsets} - - for batch in self.batchify(self.kafka_client._client.cluster.brokers(), self._broker_requests_batch_size): - for broker in batch: - broker_led_partitions = self.kafka_client._client.cluster.partitions_for_broker(broker.nodeId) - if broker_led_partitions is None: - continue - - # Take the partitions for which this broker is the leader and group them by topic in order to construct - # the OffsetRequest while simultaneously filtering out partitions we want to exclude - partitions_grouped_by_topic = defaultdict(list) - for topic, partition in broker_led_partitions: - # No sense fetching highwater offsets for internal topics - if topic not in KAFKA_INTERNAL_TOPICS and ( - self._monitor_all_broker_highwatermarks or (topic, partition) in tps_with_consumer_offset - ): - partitions_grouped_by_topic[topic].append(partition) - - # Construct the OffsetRequest - max_offsets = 1 - request = OffsetRequest[0]( - replica_id=-1, - topics=[ - (topic, [(partition, OffsetResetStrategy.LATEST, max_offsets) for partition in partitions]) - for topic, partitions in partitions_grouped_by_topic.items() - ], - ) - - # We can disable wakeup here because it is the same thread doing both polling and sending. Also, it - # is possible that the wakeup itself could block if a large number of sends were processed beforehand. - highwater_future = self._send_request_to_node(node_id=broker.nodeId, request=request, wakeup=False) - - highwater_future.add_callback(self._highwater_offsets_callback) - highwater_futures.append(highwater_future) - - # Loop until all futures resolved. - self.kafka_client._wait_for_futures(highwater_futures) - - def _highwater_offsets_callback(self, response): - """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" - if type(response) not in OffsetResponse: - raise RuntimeError("response type should be OffsetResponse, but instead was %s." % type(response)) - for topic, partitions_data in response.topics: - for partition, error_code, offsets in partitions_data: - error_type = kafka_errors.for_code(error_code) - if error_type is kafka_errors.NoError: - self._highwater_offsets[(topic, partition)] = offsets[0] - elif error_type is kafka_errors.NotLeaderForPartitionError: - self.log.warning( - "Kafka broker returned %s (error_code %s) for topic %s, partition: %s. This should only happen " - "if the broker that was the partition leader when kafka_admin_client last fetched metadata is " - "no longer the leader.", - error_type.message, - error_type.errno, - topic, - partition, - ) - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - elif error_type is kafka_errors.UnknownTopicOrPartitionError: - self.log.warning( - "Kafka broker returned %s (error_code %s) for topic: %s, partition: %s. This should only " - "happen if the topic is currently being deleted or the check configuration lists non-existent " - "topic partitions.", - error_type.message, - error_type.errno, - topic, - partition, - ) - else: - raise error_type( - "Unexpected error encountered while attempting to fetch the highwater offsets for topic: %s, " - "partition: %s." % (topic, partition) - ) - - def _report_highwater_offsets(self, contexts_limit): - """Report the broker highwater offsets.""" - reported_contexts = 0 - self.log.debug("Reporting broker offset metric") - for (topic, partition), highwater_offset in self._highwater_offsets.items(): - broker_tags = ['topic:%s' % topic, 'partition:%s' % partition] - broker_tags.extend(self._custom_tags) - self.gauge('broker_offset', highwater_offset, tags=broker_tags) - reported_contexts += 1 - if reported_contexts == contexts_limit: - return - - def _report_consumer_offsets_and_lag(self, contexts_limit): - """Report the consumer offsets and consumer lag.""" - reported_contexts = 0 - self.log.debug("Reporting consumer offsets and lag metrics") - for (consumer_group, topic, partition), consumer_offset in self._consumer_offsets.items(): - if reported_contexts >= contexts_limit: - self.log.debug( - "Reported contexts number %s greater than or equal to contexts limit of %s, returning", - str(reported_contexts), - str(contexts_limit), - ) - return - consumer_group_tags = ['topic:%s' % topic, 'partition:%s' % partition, 'consumer_group:%s' % consumer_group] - consumer_group_tags.extend(self._custom_tags) - - partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) - self.log.debug("Received partitions %s for topic %s", partitions, topic) - if partitions is not None and partition in partitions: - # report consumer offset if the partition is valid because even if leaderless the consumer offset will - # be valid once the leader failover completes - self.gauge('consumer_offset', consumer_offset, tags=consumer_group_tags) - reported_contexts += 1 - - if (topic, partition) not in self._highwater_offsets: - self.log.warning( - "Consumer group: %s has offsets for topic: %s partition: %s, but no stored highwater offset " - "(likely the partition is in the middle of leader failover) so cannot calculate consumer lag.", - consumer_group, - topic, - partition, - ) - continue - producer_offset = self._highwater_offsets[(topic, partition)] - consumer_lag = producer_offset - consumer_offset - if reported_contexts < contexts_limit: - self.gauge('consumer_lag', consumer_lag, tags=consumer_group_tags) - reported_contexts += 1 - - if consumer_lag < 0: - # this will effectively result in data loss, so emit an event for max visibility - title = "Negative consumer lag for group: {}.".format(consumer_group) - message = ( - "Consumer group: {}, topic: {}, partition: {} has negative consumer lag. This should never " - "happen and will result in the consumer skipping new messages until the lag turns " - "positive.".format(consumer_group, topic, partition) - ) - key = "{}:{}:{}".format(consumer_group, topic, partition) - self.send_event(title, message, consumer_group_tags, 'consumer_lag', key, severity="error") - self.log.debug(message) - else: - if partitions is None: - msg = ( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic has no partitions " - "in the cluster, so skipping reporting these offsets." - ) - else: - msg = ( - "Consumer group: %s has offsets for topic: %s, partition: %s, but that topic partition isn't " - "included in the cluster partitions, so skipping reporting these offsets." - ) - self.log.warning(msg, consumer_group, topic, partition) - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - - def _get_consumer_offsets(self): - """Fetch Consumer Group offsets from Kafka. - - Also fetch consumer_groups, topics, and partitions if not already specified. - - For speed, all the brokers are queried in parallel using callbacks. - The callback flow is: - A: When fetching all groups ('monitor_unlisted_consumer_groups' is True): - 1. Issue a ListGroupsRequest to every broker - 2. Attach a callback to each ListGroupsRequest that issues OffsetFetchRequests for every group. - Note: Because a broker only returns groups for which it is the coordinator, as an optimization we - skip the FindCoordinatorRequest - B: When fetching only listed groups: - 1. Issue a FindCoordintorRequest for each group - 2. Attach a callback to each FindCoordinatorResponse that issues OffsetFetchRequests for that group - Both: - 3. Attach a callback to each OffsetFetchRequest that parses the response - and saves the consumer group's offsets - """ - # Store the list of futures on the object because some of the callbacks create/store additional futures and they - # don't have access to variables scoped to this method, only to the object scope - self._consumer_futures = [] - - if self._monitor_unlisted_consumer_groups: - for broker in self.kafka_client._client.cluster.brokers(): - # FIXME: This is using a workaround to skip socket wakeup, which causes blocking - # (see https://github.com/dpkp/kafka-python/issues/2286). - # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official - # implementation for this function instead. - list_groups_future = self._list_consumer_groups_send_request(broker.nodeId) - list_groups_future.add_callback(self._list_groups_callback, broker.nodeId) - self._consumer_futures.append(list_groups_future) - elif self._consumer_groups: - self.validate_consumer_groups() - for consumer_group in self._consumer_groups: - find_coordinator_future = self._find_coordinator_id_send_request(consumer_group) - find_coordinator_future.add_callback(self._find_coordinator_callback, consumer_group) - self._consumer_futures.append(find_coordinator_future) - else: - raise ConfigurationError( - "Cannot fetch consumer offsets because no consumer_groups are specified and " - "monitor_unlisted_consumer_groups is %s." % self._monitor_unlisted_consumer_groups - ) - - # Loop until all futures resolved. - self.kafka_client._wait_for_futures(self._consumer_futures) - del self._consumer_futures # since it's reset on every check run, no sense holding the reference between runs - - def _list_groups_callback(self, broker_id, response): - """Callback that takes a ListGroupsResponse and issues an OffsetFetchRequest for each group. - - broker_id must be manually passed in because it is not present in the response. Keeping track of the broker that - gave us this response lets us skip issuing FindCoordinatorRequests because Kafka brokers only include - consumer groups in their ListGroupsResponse when they are the coordinator for that group. - """ - for consumer_group, group_type in self.kafka_client._list_consumer_groups_process_response(response): - # consumer groups from Kafka < 0.9 that store their offset in Kafka don't use Kafka for group-coordination - # so their group_type is empty - if group_type in ('consumer', ''): - single_group_offsets_future = self._list_consumer_group_offsets_send_request( - group_id=consumer_group, group_coordinator_id=broker_id - ) - single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group) - self._consumer_futures.append(single_group_offsets_future) - - def _find_coordinator_callback(self, consumer_group, response): - """Callback that takes a FindCoordinatorResponse and issues an OffsetFetchRequest for the group. - - consumer_group must be manually passed in because it is not present in the response, but we need it in order to - associate these offsets to the proper consumer group. - - The OffsetFetchRequest is scoped to the topics and partitions that are specified in the check config. If - topics are unspecified, it will fetch all known offsets for that consumer group. Similarly, if the partitions - are unspecified for a topic listed in the config, offsets are fetched for all the partitions within that topic. - """ - coordinator_id = self.kafka_client._find_coordinator_id_process_response(response) - topics = self._consumer_groups[consumer_group] - if not topics: - topic_partitions = None # None signals to fetch all known offsets for the consumer group - else: - # transform [("t1", [1, 2])] into [TopicPartition("t1", 1), TopicPartition("t1", 2)] - topic_partitions = [] - for topic, partitions in topics.items(): - if not partitions: # If partitions aren't specified, fetch all partitions in the topic - partitions = self.kafka_client._client.cluster.partitions_for_topic(topic) - topic_partitions.extend([TopicPartition(topic, p) for p in partitions]) - single_group_offsets_future = self._list_consumer_group_offsets_send_request( - group_id=consumer_group, group_coordinator_id=coordinator_id, partitions=topic_partitions - ) - single_group_offsets_future.add_callback(self._single_group_offsets_callback, consumer_group) - self._consumer_futures.append(single_group_offsets_future) - - def _single_group_offsets_callback(self, consumer_group, response): - """Callback that parses an OffsetFetchResponse and saves it to the consumer_offsets dict. - - consumer_group must be manually passed in because it is not present in the response, but we need it in order to - associate these offsets to the proper consumer group. - """ - single_group_offsets = self.kafka_client._list_consumer_group_offsets_process_response(response) - self.log.debug("Single group offsets: %s", single_group_offsets) - for (topic, partition), (offset, _metadata) in single_group_offsets.items(): - # If the OffsetFetchRequest explicitly specified partitions, the offset could returned as -1, meaning there - # is no recorded offset for that partition... for example, if the partition doesn't exist in the cluster. - # So ignore it. - if offset == -1: - self.kafka_client._client.cluster.request_update() # force metadata update on next poll() - continue - key = (consumer_group, topic, partition) - self._consumer_offsets[key] = offset - - @AgentCheck.metadata_entrypoint - def _collect_broker_metadata(self): - version_data = [str(part) for part in self.kafka_client._client.check_version()] - version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} - - self.set_metadata( - 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts - ) - - @staticmethod - def batchify(iterable, batch_size): - iterable = list(iterable) - return (iterable[i : i + batch_size] for i in range(0, len(iterable), batch_size)) - - # FIXME: This is using a workaround to skip socket wakeup, which causes blocking - # (see https://github.com/dpkp/kafka-python/issues/2286). - # Once https://github.com/dpkp/kafka-python/pull/2335 is merged in, we can use the official - # implementation for this function instead. - def _send_request_to_node(self, node_id, request, wakeup=True): - while not self.kafka_client._client.ready(node_id): - # poll until the connection to broker is ready, otherwise send() - # will fail with NodeNotReadyError - self.kafka_client._client.poll() - return self.kafka_client._client.send(node_id, request, wakeup=wakeup) - - def _list_consumer_groups_send_request(self, broker_id): - kafka_version = self.kafka_client._matching_api_version(ListGroupsRequest) - if kafka_version <= 2: - request = ListGroupsRequest[kafka_version]() - else: - raise NotImplementedError( - "Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.".format(kafka_version) - ) - # Disable wakeup when sending request to prevent blocking send requests - return self._send_request_to_node(broker_id, request, wakeup=False) - - def _find_coordinator_id_send_request(self, group_id): - """Send a FindCoordinatorRequest to a broker. - :param group_id: The consumer group ID. This is typically the group - name as a string. - :return: A message future - """ - version = 0 - request = GroupCoordinatorRequest[version](group_id) - return self._send_request_to_node(self.kafka_client._client.least_loaded_node(), request, wakeup=False) - - def _list_consumer_group_offsets_send_request(self, group_id, group_coordinator_id, partitions=None): - """Send an OffsetFetchRequest to a broker. - :param group_id: The consumer group id name for which to fetch offsets. - :param group_coordinator_id: The node_id of the group's coordinator - broker. - :return: A message future - """ - version = self.kafka_client._matching_api_version(OffsetFetchRequest) - if version <= 3: - if partitions is None: - if version <= 1: - raise ValueError( - """OffsetFetchRequest_v{} requires specifying the - partitions for which to fetch offsets. Omitting the - partitions is only supported on brokers >= 0.10.2. - For details, see KIP-88.""".format( - version - ) - ) - topics_partitions = None - else: - # transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])] - topics_partitions_dict = defaultdict(set) - for topic, partition in partitions: - topics_partitions_dict[topic].add(partition) - topics_partitions = list(six.iteritems(topics_partitions_dict)) - request = OffsetFetchRequest[version](group_id, topics_partitions) - else: - raise NotImplementedError( - "Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.".format(version) - ) - return self._send_request_to_node(group_coordinator_id, request, wakeup=False) + # TODO: Remove me once the tests are refactored + def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): + if self.client is None: + self.client = make_client(self) + self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info') From fbf625124c7aa48d9d0610c841883fc5095b4e63 Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 15 Feb 2023 10:19:54 -0500 Subject: [PATCH 11/12] Remove KafkaClient and refactor tests (#13954) --- .../kafka_consumer/kafka_consumer.py | 92 +------------------ kafka_consumer/tests/test_kafka_consumer.py | 32 ++++--- 2 files changed, 20 insertions(+), 104 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 18be6209951c1..5098d7dd02986 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -1,26 +1,11 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) -import ssl -from kafka import KafkaAdminClient, KafkaClient -from kafka.oauth.abstract import AbstractTokenProvider -from six import string_types - -from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative -from datadog_checks.base.utils.http import AuthTokenOAuthReader +from datadog_checks.base import AgentCheck, is_affirmative from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client -from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT - - -class OAuthTokenProvider(AbstractTokenProvider): - def __init__(self, **config): - self.reader = AuthTokenOAuthReader(config) - - def token(self): - # Read only if necessary or use cached token - return self.reader.read() or self.reader._token +from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND class KafkaCheck(AgentCheck): @@ -48,70 +33,12 @@ def __init__(self, name, init_config, instances): ) self._consumer_groups = self.instance.get('consumer_groups', {}) self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) - self.client = None - - def create_kafka_client(self): - return self._create_kafka_client(clazz=KafkaClient) - - def create_kafka_admin_client(self): - return self._create_kafka_client(clazz=KafkaAdminClient) - - def _create_kafka_client(self, clazz): - kafka_connect_str = self.instance.get('kafka_connect_str') - if not isinstance(kafka_connect_str, (string_types, list)): - raise ConfigurationError('kafka_connect_str should be string or list of strings') - kafka_version = self.instance.get('kafka_client_api_version') - if isinstance(kafka_version, str): - kafka_version = tuple(map(int, kafka_version.split("."))) - - tls_context = self.get_tls_context() - crlfile = self.instance.get('ssl_crlfile', self.instance.get('tls_crlfile')) - if crlfile: - tls_context.load_verify_locations(crlfile) - tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - - return clazz( - bootstrap_servers=kafka_connect_str, - client_id='dd-agent', - request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, - # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for - # broker version during the bootstrapping process. Note that this returns the first version found, so in - # a mixed-version cluster this will be a non-deterministic result. - api_version=kafka_version, - # While we check for SASL/SSL params, if not present they will default to the kafka-python values for - # plaintext connections - security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'), - sasl_mechanism=self.instance.get('sasl_mechanism'), - sasl_plain_username=self.instance.get('sasl_plain_username'), - sasl_plain_password=self.instance.get('sasl_plain_password'), - sasl_kerberos_service_name=self.instance.get('sasl_kerberos_service_name', 'kafka'), - sasl_kerberos_domain_name=self.instance.get('sasl_kerberos_domain_name'), - sasl_oauth_token_provider=( - OAuthTokenProvider(**self.instance['sasl_oauth_token_provider']) - if 'sasl_oauth_token_provider' in self.instance - else None - ), - ssl_context=tls_context, - ) - - @property - def kafka_client(self): - if self._kafka_client is None: - # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for - # broker version during the bootstrapping process. Note that this returns the first version found, so in - # a mixed-version cluster this will be a non-deterministic result. - kafka_version = self.instance.get('kafka_client_api_version') - if isinstance(kafka_version, str): - kafka_version = tuple(map(int, kafka_version.split("."))) - - self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) - return self._kafka_client + self.client = make_client(self) def check(self, _): """The main entrypoint of the check.""" self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._highwater_offsets = {} # Expected format: {(topic, partition): offset} - self.client = make_client(self) # Fetch Kafka consumer offsets try: @@ -151,19 +78,6 @@ def check(self, _): def collect_broker_metadata(self): self.client.collect_broker_metadata() - def _create_kafka_admin_client(self, api_version): - """Return a KafkaAdminClient.""" - # TODO accept None (which inherits kafka-python default of localhost:9092) - kafka_admin_client = self.create_kafka_admin_client() - self.log.debug("KafkaAdminClient api_version: %s", kafka_admin_client.config['api_version']) - # Force initial population of the local cluster metadata cache - kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update()) - if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None: - raise RuntimeError("Local cluster metadata cache did not populate.") - return kafka_admin_client - # TODO: Remove me once the tests are refactored def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): - if self.client is None: - self.client = make_client(self) self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info') diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 19d9fb0511125..76e6a70321e8f 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -9,7 +9,7 @@ from datadog_checks.base import ConfigurationError from datadog_checks.kafka_consumer import KafkaCheck -from datadog_checks.kafka_consumer.kafka_consumer import OAuthTokenProvider +from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider from .common import BROKER_METRICS, CONSUMER_METRICS, KAFKA_CONNECT_STR @@ -34,22 +34,22 @@ def test_gssapi(kafka_instance, dd_run_check): @pytest.mark.unit def test_tls_config_ok(kafka_instance_tls): with mock.patch('datadog_checks.base.utils.tls.ssl') as ssl: - with mock.patch('kafka.KafkaClient') as kafka_client: + with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: # mock Kafka Client - kafka_client.return_value = mock.MagicMock() + kafka_admin_client.return_value = mock.MagicMock() # mock TLS context tls_context = mock.MagicMock() ssl.SSLContext.return_value = tls_context kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance_tls]) - kafka_consumer_check._create_kafka_client(clazz=kafka_client) + kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) assert tls_context.check_hostname is True assert tls_context.tls_cert is not None assert tls_context.check_hostname is True - assert kafka_consumer_check.create_kafka_client is not None + assert kafka_consumer_check.client.create_kafka_admin_client is not None @pytest.mark.parametrize( @@ -108,15 +108,17 @@ def test_oauth_token_client_config(kafka_instance): "client_secret": "secret", } - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) - client = kafka_consumer_check.create_kafka_client() + with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) + _ = kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) + params = kafka_admin_client.call_args_list[0].kwargs - assert client.config['security_protocol'] == 'SASL_PLAINTEXT' - assert client.config['sasl_mechanism'] == 'OAUTHBEARER' - assert isinstance(client.config['sasl_oauth_token_provider'], OAuthTokenProvider) - assert client.config['sasl_oauth_token_provider'].reader._client_id == "id" - assert client.config['sasl_oauth_token_provider'].reader._client_secret == "secret" - assert client.config['sasl_oauth_token_provider'].reader._url == "http://fake.url" + assert params['security_protocol'] == 'SASL_PLAINTEXT' + assert params['sasl_mechanism'] == 'OAUTHBEARER' + assert params['sasl_oauth_token_provider'].reader._client_id == "id" + assert params['sasl_oauth_token_provider'].reader._client_secret == "secret" + assert params['sasl_oauth_token_provider'].reader._url == "http://fake.url" + assert isinstance(params['sasl_oauth_token_provider'], OAuthTokenProvider) @pytest.mark.parametrize( @@ -222,8 +224,8 @@ def test_version_metadata(datadog_agent, kafka_instance, dd_run_check): kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) kafka_consumer_check.check_id = 'test:123' - kafka_client = kafka_consumer_check.create_kafka_client() - version_data = [str(part) for part in kafka_client.check_version()] + kafka_client = kafka_consumer_check.client.create_kafka_admin_client() + version_data = [str(part) for part in kafka_client._client.check_version()] kafka_client.close() version_parts = {'version.{}'.format(name): part for name, part in zip(('major', 'minor', 'patch'), version_data)} version_parts['version.scheme'] = 'semver' From 97a87f0e2930fa612cc1b466603bb23d9b801a4a Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Wed, 15 Feb 2023 11:28:04 -0500 Subject: [PATCH 12/12] Revert "Remove KafkaClient and refactor tests (#13954)" This reverts commit e327d71fb21180731720c0772dff3168af57bc6e. --- .../kafka_consumer/kafka_consumer.py | 92 ++++++++++++++++++- kafka_consumer/tests/test_kafka_consumer.py | 32 +++---- 2 files changed, 104 insertions(+), 20 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 5098d7dd02986..18be6209951c1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -1,11 +1,26 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import ssl -from datadog_checks.base import AgentCheck, is_affirmative +from kafka import KafkaAdminClient, KafkaClient +from kafka.oauth.abstract import AbstractTokenProvider +from six import string_types + +from datadog_checks.base import AgentCheck, ConfigurationError, is_affirmative +from datadog_checks.base.utils.http import AuthTokenOAuthReader from datadog_checks.kafka_consumer.client.kafka_client_factory import make_client -from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND +from .constants import BROKER_REQUESTS_BATCH_SIZE, CONTEXT_UPPER_BOUND, DEFAULT_KAFKA_TIMEOUT + + +class OAuthTokenProvider(AbstractTokenProvider): + def __init__(self, **config): + self.reader = AuthTokenOAuthReader(config) + + def token(self): + # Read only if necessary or use cached token + return self.reader.read() or self.reader._token class KafkaCheck(AgentCheck): @@ -33,12 +48,70 @@ def __init__(self, name, init_config, instances): ) self._consumer_groups = self.instance.get('consumer_groups', {}) self._broker_requests_batch_size = self.instance.get('broker_requests_batch_size', BROKER_REQUESTS_BATCH_SIZE) - self.client = make_client(self) + self.client = None + + def create_kafka_client(self): + return self._create_kafka_client(clazz=KafkaClient) + + def create_kafka_admin_client(self): + return self._create_kafka_client(clazz=KafkaAdminClient) + + def _create_kafka_client(self, clazz): + kafka_connect_str = self.instance.get('kafka_connect_str') + if not isinstance(kafka_connect_str, (string_types, list)): + raise ConfigurationError('kafka_connect_str should be string or list of strings') + kafka_version = self.instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) + + tls_context = self.get_tls_context() + crlfile = self.instance.get('ssl_crlfile', self.instance.get('tls_crlfile')) + if crlfile: + tls_context.load_verify_locations(crlfile) + tls_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF + + return clazz( + bootstrap_servers=kafka_connect_str, + client_id='dd-agent', + request_timeout_ms=self.init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT) * 1000, + # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for + # broker version during the bootstrapping process. Note that this returns the first version found, so in + # a mixed-version cluster this will be a non-deterministic result. + api_version=kafka_version, + # While we check for SASL/SSL params, if not present they will default to the kafka-python values for + # plaintext connections + security_protocol=self.instance.get('security_protocol', 'PLAINTEXT'), + sasl_mechanism=self.instance.get('sasl_mechanism'), + sasl_plain_username=self.instance.get('sasl_plain_username'), + sasl_plain_password=self.instance.get('sasl_plain_password'), + sasl_kerberos_service_name=self.instance.get('sasl_kerberos_service_name', 'kafka'), + sasl_kerberos_domain_name=self.instance.get('sasl_kerberos_domain_name'), + sasl_oauth_token_provider=( + OAuthTokenProvider(**self.instance['sasl_oauth_token_provider']) + if 'sasl_oauth_token_provider' in self.instance + else None + ), + ssl_context=tls_context, + ) + + @property + def kafka_client(self): + if self._kafka_client is None: + # if `kafka_client_api_version` is not set, then kafka-python automatically probes the cluster for + # broker version during the bootstrapping process. Note that this returns the first version found, so in + # a mixed-version cluster this will be a non-deterministic result. + kafka_version = self.instance.get('kafka_client_api_version') + if isinstance(kafka_version, str): + kafka_version = tuple(map(int, kafka_version.split("."))) + + self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) + return self._kafka_client def check(self, _): """The main entrypoint of the check.""" self._consumer_offsets = {} # Expected format: {(consumer_group, topic, partition): offset} self._highwater_offsets = {} # Expected format: {(topic, partition): offset} + self.client = make_client(self) # Fetch Kafka consumer offsets try: @@ -78,6 +151,19 @@ def check(self, _): def collect_broker_metadata(self): self.client.collect_broker_metadata() + def _create_kafka_admin_client(self, api_version): + """Return a KafkaAdminClient.""" + # TODO accept None (which inherits kafka-python default of localhost:9092) + kafka_admin_client = self.create_kafka_admin_client() + self.log.debug("KafkaAdminClient api_version: %s", kafka_admin_client.config['api_version']) + # Force initial population of the local cluster metadata cache + kafka_admin_client._client.poll(future=kafka_admin_client._client.cluster.request_update()) + if kafka_admin_client._client.cluster.topics(exclude_internal_topics=False) is None: + raise RuntimeError("Local cluster metadata cache did not populate.") + return kafka_admin_client + # TODO: Remove me once the tests are refactored def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): + if self.client is None: + self.client = make_client(self) self.client._send_event(title, text, tags, event_type, aggregation_key, severity='info') diff --git a/kafka_consumer/tests/test_kafka_consumer.py b/kafka_consumer/tests/test_kafka_consumer.py index 76e6a70321e8f..19d9fb0511125 100644 --- a/kafka_consumer/tests/test_kafka_consumer.py +++ b/kafka_consumer/tests/test_kafka_consumer.py @@ -9,7 +9,7 @@ from datadog_checks.base import ConfigurationError from datadog_checks.kafka_consumer import KafkaCheck -from datadog_checks.kafka_consumer.client.kafka_python_client import OAuthTokenProvider +from datadog_checks.kafka_consumer.kafka_consumer import OAuthTokenProvider from .common import BROKER_METRICS, CONSUMER_METRICS, KAFKA_CONNECT_STR @@ -34,22 +34,22 @@ def test_gssapi(kafka_instance, dd_run_check): @pytest.mark.unit def test_tls_config_ok(kafka_instance_tls): with mock.patch('datadog_checks.base.utils.tls.ssl') as ssl: - with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: + with mock.patch('kafka.KafkaClient') as kafka_client: # mock Kafka Client - kafka_admin_client.return_value = mock.MagicMock() + kafka_client.return_value = mock.MagicMock() # mock TLS context tls_context = mock.MagicMock() ssl.SSLContext.return_value = tls_context kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance_tls]) - kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) + kafka_consumer_check._create_kafka_client(clazz=kafka_client) assert tls_context.check_hostname is True assert tls_context.tls_cert is not None assert tls_context.check_hostname is True - assert kafka_consumer_check.client.create_kafka_admin_client is not None + assert kafka_consumer_check.create_kafka_client is not None @pytest.mark.parametrize( @@ -108,17 +108,15 @@ def test_oauth_token_client_config(kafka_instance): "client_secret": "secret", } - with mock.patch('kafka.KafkaAdminClient') as kafka_admin_client: - kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) - _ = kafka_consumer_check.client._create_kafka_client(clazz=kafka_admin_client) - params = kafka_admin_client.call_args_list[0].kwargs + kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [instance]) + client = kafka_consumer_check.create_kafka_client() - assert params['security_protocol'] == 'SASL_PLAINTEXT' - assert params['sasl_mechanism'] == 'OAUTHBEARER' - assert params['sasl_oauth_token_provider'].reader._client_id == "id" - assert params['sasl_oauth_token_provider'].reader._client_secret == "secret" - assert params['sasl_oauth_token_provider'].reader._url == "http://fake.url" - assert isinstance(params['sasl_oauth_token_provider'], OAuthTokenProvider) + assert client.config['security_protocol'] == 'SASL_PLAINTEXT' + assert client.config['sasl_mechanism'] == 'OAUTHBEARER' + assert isinstance(client.config['sasl_oauth_token_provider'], OAuthTokenProvider) + assert client.config['sasl_oauth_token_provider'].reader._client_id == "id" + assert client.config['sasl_oauth_token_provider'].reader._client_secret == "secret" + assert client.config['sasl_oauth_token_provider'].reader._url == "http://fake.url" @pytest.mark.parametrize( @@ -224,8 +222,8 @@ def test_version_metadata(datadog_agent, kafka_instance, dd_run_check): kafka_consumer_check = KafkaCheck('kafka_consumer', {}, [kafka_instance]) kafka_consumer_check.check_id = 'test:123' - kafka_client = kafka_consumer_check.client.create_kafka_admin_client() - version_data = [str(part) for part in kafka_client._client.check_version()] + kafka_client = kafka_consumer_check.create_kafka_client() + version_data = [str(part) for part in kafka_client.check_version()] kafka_client.close() version_parts = {'version.{}'.format(name): part for name, part in zip(('major', 'minor', 'patch'), version_data)} version_parts['version.scheme'] = 'semver'