From feaf06956068dbaa5c91ab82cb5f1a3cd77de63e Mon Sep 17 00:00:00 2001 From: Andrew Zhang Date: Fri, 3 Mar 2023 10:36:36 -0500 Subject: [PATCH] Remove collect_broker_version (#14095) * Remove collect_broker_version * Remove commented out code --- .../client/confluent_kafka_client.py | 3 --- .../kafka_consumer/client/generic_kafka_client.py | 7 ------- .../kafka_consumer/client/kafka_client.py | 4 ---- .../kafka_consumer/client/kafka_python_client.py | 3 --- .../kafka_consumer/kafka_consumer.py | 11 ----------- .../tests/python_client/test_integration.py | 15 --------------- 6 files changed, 43 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py index 09ad7b1b44823..00237cc8eff0f 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/confluent_kafka_client.py @@ -40,9 +40,6 @@ def get_partitions_for_topic(self, topic): def request_metadata_update(self): raise NotImplementedError - def collect_broker_version(self): - raise NotImplementedError - def get_consumer_offsets(self): raise NotImplementedError diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py index e186877eed599..109b3b2e2b158 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/generic_kafka_client.py @@ -59,13 +59,6 @@ def request_metadata_update(self): # return self.confluent_kafka_client.request_metadata_update() return self.python_kafka_client.request_metadata_update() - def collect_broker_version(self): - # TODO when this method is implemented in ConfluentKafkaClient, replace this with: - # if self.use_legacy_client: - # return self.python_kafka_client.collect_broker_version() - # return self.confluent_kafka_client.collect_broker_version() - return self.python_kafka_client.collect_broker_version() - def get_consumer_offsets_dict(self): # TODO when this method is implemented in ConfluentKafkaClient, replace this with: # if self.use_legacy_client: diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py index 2d8742f0f82bb..464d69cac3c8b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_client.py @@ -44,7 +44,3 @@ def get_partitions_for_topic(self, topic): @abstractmethod def request_metadata_update(self): pass - - @abstractmethod - def collect_broker_version(self): - pass diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py index e6394a6ee35a3..bb9ae343708d1 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client/kafka_python_client.py @@ -198,9 +198,6 @@ def kafka_client(self): self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version) return self._kafka_client - def collect_broker_version(self): - return self.kafka_client._client.check_version() - def _highwater_offsets_callback(self, response): """Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict.""" if type(response) not in OffsetResponse: diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 2d6ebe19b332f..30b87c5185d27 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -73,8 +73,6 @@ def check(self, _): consumer_offsets, highwater_offsets, self._context_limit - len(highwater_offsets) ) - self.collect_broker_metadata() - def report_highwater_offsets(self, highwater_offsets, contexts_limit): """Report the broker highwater offsets.""" reported_contexts = 0 @@ -150,15 +148,6 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c self.log.warning(msg, consumer_group, topic, partition) self.client.request_metadata_update() # force metadata update on next poll() - @AgentCheck.metadata_entrypoint - def collect_broker_metadata(self): - version_data = [str(part) for part in self.client.collect_broker_version()] - version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)} - - self.set_metadata( - 'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts - ) - def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'): """Emit an event to the Datadog Event Stream.""" event_dict = { diff --git a/kafka_consumer/tests/python_client/test_integration.py b/kafka_consumer/tests/python_client/test_integration.py index 9f7ea820cdc07..d109456ba7a4c 100644 --- a/kafka_consumer/tests/python_client/test_integration.py +++ b/kafka_consumer/tests/python_client/test_integration.py @@ -58,21 +58,6 @@ def test_no_partitions(aggregator, check, kafka_instance, dd_run_check): assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}}) -def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check): - kafka_consumer_check = check(kafka_instance) - kafka_consumer_check.check_id = 'test:123' - - kafka_client = kafka_consumer_check.client.create_kafka_admin_client() - version_data = [str(part) for part in kafka_client._client.check_version()] - kafka_client.close() - version_parts = {f'version.{name}': part for name, part in zip(('major', 'minor', 'patch'), version_data)} - version_parts['version.scheme'] = 'semver' - version_parts['version.raw'] = '.'.join(version_data) - - dd_run_check(kafka_consumer_check) - datadog_agent.assert_metadata('test:123', version_parts) - - @pytest.mark.parametrize( 'is_enabled, metric_count, topic_tags', [