From cef28d6f24e06c5161da911458c171201ab0fd31 Mon Sep 17 00:00:00 2001 From: Allen Kim Date: Tue, 22 Oct 2024 17:22:07 +0900 Subject: [PATCH] Fix to allow topic to be passed via kwargs (#2901) * Fix to allow topic to be imported from kwargs * add changelog * lint * separate assert function --- CHANGELOG.md | 2 ++ .../instrumentation/confluent_kafka/__init__.py | 4 +++- .../instrumentation/confluent_kafka/utils.py | 6 ++---- .../tests/test_instrumentation.py | 15 +++++++++++++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 775043035f..075f1c0a57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__` ([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874)) +- `opentelemetry-instrumentation-confluent-kafka` Fix to allow `topic` to be extracted from `kwargs` in `produce()` + ([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 3d1cc79c93..95a14627b3 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -363,7 +363,9 @@ def wrap_produce(func, instance, tracer, args, kwargs): headers = [] kwargs["headers"] = headers - topic = KafkaPropertiesExtractor.extract_produce_topic(args) + topic = KafkaPropertiesExtractor.extract_produce_topic( + args, kwargs + ) _enrich_span( span, topic, diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index 4769f2a88f..60dc13e675 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs): return kwargs.get(key, default_value) @staticmethod - def extract_produce_topic(args): + def extract_produce_topic(args, kwargs): """extract topic from `produce` method arguments in Producer class""" - if len(args) > 0: - return args[0] - return "unknown" + return kwargs.get("topic") or (args[0] if args else "unknown") @staticmethod def extract_produce_headers(args, kwargs): diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 27653d6777..986116900d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -284,6 +284,15 @@ def _compare_spans(self, spans, expected_spans): expected_attribute_value, span.attributes[attribute_key] ) + def _assert_topic(self, span, expected_topic: str) -> None: + self.assertEqual( + span.attributes[SpanAttributes.MESSAGING_DESTINATION], + expected_topic, + ) + + def _assert_span_count(self, span_list, expected_count: int) -> None: + self.assertEqual(len(span_list), expected_count) + def test_producer_poll(self) -> None: instrumentation = ConfluentKafkaInstrumentor() message_queue = [] @@ -299,6 +308,9 @@ def test_producer_poll(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.poll() self.assertIsNotNone(msg) + span_list = self.memory_exporter.get_finished_spans() + self._assert_span_count(span_list, 1) + self._assert_topic(span_list[0], "topic-1") def test_producer_flush(self) -> None: instrumentation = ConfluentKafkaInstrumentor() @@ -315,3 +327,6 @@ def test_producer_flush(self) -> None: producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.flush() self.assertIsNotNone(msg) + span_list = self.memory_exporter.get_finished_spans() + self._assert_span_count(span_list, 1) + self._assert_topic(span_list[0], "topic-1")