From 17144faa2b83a2be78e7e6a67c97269238310263 Mon Sep 17 00:00:00 2001 From: Daniel Ferrochio Date: Wed, 15 May 2024 19:22:48 +0000 Subject: [PATCH 1/5] add return statements to confluent kafka producer flush and poll methods --- .../opentelemetry/instrumentation/confluent_kafka/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index c869d03dd9..30181d39c2 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -151,10 +151,10 @@ def __init__(self, producer: Producer, tracer: Tracer): self._tracer = tracer def flush(self, timeout=-1): - self._producer.flush(timeout) + return self._producer.flush(timeout) def poll(self, timeout=-1): - self._producer.poll(timeout) + return self._producer.poll(timeout) def produce( self, topic, value=None, *args, **kwargs From aaccb968baf870995a9749a13509f5dff33cc29f Mon Sep 17 00:00:00 2001 From: Daniel Ferrochio Date: Wed, 15 May 2024 20:51:19 +0000 Subject: [PATCH 2/5] add unittests --- .../tests/test_instrumentation.py | 34 +++++++++++++++++- .../tests/utils.py | 36 +++++++++++++++++-- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 21d5bd6f83..b83ccf86b9 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -31,7 +31,7 @@ ) from opentelemetry.test.test_base import TestBase -from .utils import MockConsumer, MockedMessage +from .utils import MockConsumer, MockedMessage, MockedProducer class TestConfluentKafka(TestBase): @@ -246,3 +246,35 @@ def _compare_spans(self, spans, expected_spans): self.assertEqual( expected_attribute_value, span.attributes[attribute_key] ) + + def test_producer_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.poll() + self.assertIsNotNone(msg) + + def test_producer_flush(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.flush() + self.assertIsNotNone(msg) \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index 798daaeff4..0fc3bf696d 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -1,4 +1,4 @@ -from confluent_kafka import Consumer +from confluent_kafka import Consumer, Producer class MockConsumer(Consumer): @@ -20,7 +20,7 @@ def poll(self, timeout=None): class MockedMessage: - def __init__(self, topic: str, partition: int, offset: int, headers): + def __init__(self, topic: str, partition: int, offset: int, headers, key=None, value=None): self._topic = topic self._partition = partition self._offset = offset @@ -37,3 +37,35 @@ def offset(self): def headers(self): return self._headers + + def key(self): + return self._key + + def value(self): + return self._value + + +class MockedProducer(Producer): + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def produce( + self, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + self._queue.append( + MockedMessage( + topic=kwargs.get("topic"), + partition=0, + offset=0, + headers=[], + key=kwargs.get("key"), + value=kwargs.get("value") + ) + ) + + def poll(self, timeout=None): + return len(self._queue) + + def flush(self, timeout=None): + return len(self._queue) From 3df70cdb25987d803ea362aab3693d6d00ce5a1d Mon Sep 17 00:00:00 2001 From: Daniel Ferrochio Date: Thu, 16 May 2024 13:28:52 +0000 Subject: [PATCH 3/5] fix unittests --- .../tests/utils.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index 0fc3bf696d..f16bb26b86 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -1,5 +1,5 @@ from confluent_kafka import Consumer, Producer - +from typing import Optional class MockConsumer(Consumer): def __init__(self, queue, config): @@ -20,11 +20,21 @@ def poll(self, timeout=None): class MockedMessage: - def __init__(self, topic: str, partition: int, offset: int, headers, key=None, value=None): + def __init__( + self, + topic: str, + partition: int, + offset: int, + headers, + key: Optional[str]=None, + value=Optional[str]=None + ): self._topic = topic self._partition = partition self._offset = offset self._headers = headers + self._key = key + self._value = value def topic(self): return self._topic From a33309976816ee53d0e8316b8e4035912f557aa9 Mon Sep 17 00:00:00 2001 From: Daniel Ferrochio Date: Thu, 16 May 2024 14:15:24 +0000 Subject: [PATCH 4/5] fix lint --- .../tests/test_instrumentation.py | 2 +- .../tests/utils.py | 26 ++++++++++--------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index b83ccf86b9..205de27733 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -277,4 +277,4 @@ def test_producer_flush(self) -> None: producer = instrumentation.instrument_producer(producer) producer.produce(topic="topic-1", key="key-1", value="value-1") msg = producer.flush() - self.assertIsNotNone(msg) \ No newline at end of file + self.assertIsNotNone(msg) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index f16bb26b86..92e11798f6 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -1,6 +1,8 @@ -from confluent_kafka import Consumer, Producer from typing import Optional +from confluent_kafka import Consumer, Producer + + class MockConsumer(Consumer): def __init__(self, queue, config): self._queue = queue @@ -21,14 +23,14 @@ def poll(self, timeout=None): class MockedMessage: def __init__( - self, - topic: str, - partition: int, - offset: int, - headers, - key: Optional[str]=None, - value=Optional[str]=None - ): + self, + topic: str, + partition: int, + offset: int, + headers, + key: Optional[str] = None, + value: Optional[str] = None, + ): self._topic = topic self._partition = partition self._offset = offset @@ -70,12 +72,12 @@ def produce( offset=0, headers=[], key=kwargs.get("key"), - value=kwargs.get("value") + value=kwargs.get("value"), ) ) - def poll(self, timeout=None): + def poll(self, *args, **kwargs): return len(self._queue) - def flush(self, timeout=None): + def flush(self, *args, **kwargs): return len(self._queue) From 23952d494b952c1a7dc03e931d5ce4fc9176e394 Mon Sep 17 00:00:00 2001 From: Daniel Ferrochio Date: Thu, 16 May 2024 14:53:56 +0000 Subject: [PATCH 5/5] update CHANGELOG.md --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d10983c10b..acbb96dd5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes +- Add return statement to Confluent kafka Producer poll() and flush() calls when instrumented by ConfluentKafkaInstrumentor().instrument_producer() ([#2527](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2527)) - Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi` ([#2300](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2300)) - Rename AwsLambdaInstrumentor span attributes `faas.id` to `cloud.resource_id`, `faas.execution` to `faas.invocation_id` @@ -124,7 +125,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1959](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1959)) - `opentelemetry-resource-detector-azure` Added dependency for Cloud Resource ID attribute ([#2072](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2072)) - + ## Version 1.21.0/0.42b0 (2023-11-01) ### Added