Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add confluent kafka producer poll and flush returns #2527

Merged
merged 9 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Breaking changes

- Add return statement to Confluent kafka Producer poll() and flush() calls when instrumented by ConfluentKafkaInstrumentor().instrument_producer() ([#2527](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2527))
- Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi`
([#2300](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2300))
- Rename AwsLambdaInstrumentor span attributes `faas.id` to `cloud.resource_id`, `faas.execution` to `faas.invocation_id`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ def __init__(self, producer: Producer, tracer: Tracer):
self._tracer = tracer

def flush(self, timeout=-1):
self._producer.flush(timeout)
return self._producer.flush(timeout)

def poll(self, timeout=-1):
self._producer.poll(timeout)
return self._producer.poll(timeout)

def produce(
self, topic, value=None, *args, **kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)
from opentelemetry.test.test_base import TestBase

from .utils import MockConsumer, MockedMessage
from .utils import MockConsumer, MockedMessage, MockedProducer


class TestConfluentKafka(TestBase):
Expand Down Expand Up @@ -246,3 +246,35 @@ def _compare_spans(self, spans, expected_spans):
self.assertEqual(
expected_attribute_value, span.attributes[attribute_key]
)

def test_producer_poll(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []

producer = MockedProducer(
message_queue,
{
"bootstrap.servers": "localhost:29092",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.poll()
self.assertIsNotNone(msg)

def test_producer_flush(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
message_queue = []

producer = MockedProducer(
message_queue,
{
"bootstrap.servers": "localhost:29092",
},
)

producer = instrumentation.instrument_producer(producer)
producer.produce(topic="topic-1", key="key-1", value="value-1")
msg = producer.flush()
self.assertIsNotNone(msg)
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from confluent_kafka import Consumer
from typing import Optional

from confluent_kafka import Consumer, Producer


class MockConsumer(Consumer):
Expand All @@ -20,11 +22,21 @@ def poll(self, timeout=None):


class MockedMessage:
def __init__(self, topic: str, partition: int, offset: int, headers):
def __init__(
self,
topic: str,
partition: int,
offset: int,
headers,
key: Optional[str] = None,
value: Optional[str] = None,
):
self._topic = topic
self._partition = partition
self._offset = offset
self._headers = headers
self._key = key
self._value = value

def topic(self):
return self._topic
Expand All @@ -37,3 +49,35 @@ def offset(self):

def headers(self):
return self._headers

def key(self):
return self._key

def value(self):
return self._value


class MockedProducer(Producer):
def __init__(self, queue, config):
self._queue = queue
super().__init__(config)

def produce(
self, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg
self._queue.append(
MockedMessage(
topic=kwargs.get("topic"),
partition=0,
offset=0,
headers=[],
key=kwargs.get("key"),
value=kwargs.get("value"),
)
)

def poll(self, *args, **kwargs):
return len(self._queue)

def flush(self, *args, **kwargs):
return len(self._queue)
Loading