Skip to content

Commit

Permalink
Remove collect_broker_version (#14095)
Browse files Browse the repository at this point in the history
* Remove collect_broker_version

* Remove commented out code
  • Loading branch information
yzhan289 authored Mar 3, 2023
1 parent 74e3b19 commit feaf069
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ def get_partitions_for_topic(self, topic):
def request_metadata_update(self):
raise NotImplementedError

def collect_broker_version(self):
raise NotImplementedError

def get_consumer_offsets(self):
raise NotImplementedError

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ def request_metadata_update(self):
# return self.confluent_kafka_client.request_metadata_update()
return self.python_kafka_client.request_metadata_update()

def collect_broker_version(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
# return self.python_kafka_client.collect_broker_version()
# return self.confluent_kafka_client.collect_broker_version()
return self.python_kafka_client.collect_broker_version()

def get_consumer_offsets_dict(self):
# TODO when this method is implemented in ConfluentKafkaClient, replace this with:
# if self.use_legacy_client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,3 @@ def get_partitions_for_topic(self, topic):
@abstractmethod
def request_metadata_update(self):
pass

@abstractmethod
def collect_broker_version(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,6 @@ def kafka_client(self):
self._kafka_client = self._create_kafka_admin_client(api_version=kafka_version)
return self._kafka_client

def collect_broker_version(self):
return self.kafka_client._client.check_version()

def _highwater_offsets_callback(self, response):
"""Callback that parses an OffsetFetchResponse and saves it to the highwater_offsets dict."""
if type(response) not in OffsetResponse:
Expand Down
11 changes: 0 additions & 11 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ def check(self, _):
consumer_offsets, highwater_offsets, self._context_limit - len(highwater_offsets)
)

self.collect_broker_metadata()

def report_highwater_offsets(self, highwater_offsets, contexts_limit):
"""Report the broker highwater offsets."""
reported_contexts = 0
Expand Down Expand Up @@ -150,15 +148,6 @@ def report_consumer_offsets_and_lag(self, consumer_offsets, highwater_offsets, c
self.log.warning(msg, consumer_group, topic, partition)
self.client.request_metadata_update() # force metadata update on next poll()

@AgentCheck.metadata_entrypoint
def collect_broker_metadata(self):
version_data = [str(part) for part in self.client.collect_broker_version()]
version_parts = {name: part for name, part in zip(('major', 'minor', 'patch'), version_data)}

self.set_metadata(
'version', '.'.join(version_data), scheme='parts', final_scheme='semver', part_map=version_parts
)

def send_event(self, title, text, tags, event_type, aggregation_key, severity='info'):
"""Emit an event to the Datadog Event Stream."""
event_dict = {
Expand Down
15 changes: 0 additions & 15 deletions kafka_consumer/tests/python_client/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,6 @@ def test_no_partitions(aggregator, check, kafka_instance, dd_run_check):
assert_check_kafka(aggregator, {'my_consumer': {'marvel': [0]}})


def test_version_metadata(datadog_agent, check, kafka_instance, dd_run_check):
kafka_consumer_check = check(kafka_instance)
kafka_consumer_check.check_id = 'test:123'

kafka_client = kafka_consumer_check.client.create_kafka_admin_client()
version_data = [str(part) for part in kafka_client._client.check_version()]
kafka_client.close()
version_parts = {f'version.{name}': part for name, part in zip(('major', 'minor', 'patch'), version_data)}
version_parts['version.scheme'] = 'semver'
version_parts['version.raw'] = '.'.join(version_data)

dd_run_check(kafka_consumer_check)
datadog_agent.assert_metadata('test:123', version_parts)


@pytest.mark.parametrize(
'is_enabled, metric_count, topic_tags',
[
Expand Down

0 comments on commit feaf069

Please sign in to comment.