From 0f791f9e683e6c0941bb5e39b2aa94e62be0b1c5 Mon Sep 17 00:00:00 2001 From: Toomas Ormisson Date: Sat, 13 Jul 2024 22:17:50 +0200 Subject: [PATCH] messaging.rabbitmq.destination.routing_key --- .../aio_pika/callback_decorator.py | 5 ++++- .../aio_pika/publish_decorator.py | 5 ++++- .../instrumentation/aio_pika/span_builder.py | 22 ++++++++++++++----- .../tests/consts.py | 4 ++++ .../tests/test_callback_decorator.py | 15 +++++++++---- .../tests/test_publish_decorator.py | 14 ++++++++---- .../tests/test_span_builder.py | 6 ++++- 7 files changed, 55 insertions(+), 16 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py index f10415bdd2..d9e7779dcc 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py @@ -32,7 +32,10 @@ def _get_span(self, message: AbstractIncomingMessage) -> Optional[Span]: builder = SpanBuilder(self._tracer) builder.set_as_consumer() builder.set_operation(MessagingOperationValues.RECEIVE) - builder.set_destination(message.exchange or message.routing_key) + builder.set_destination( + exchange_name=message.exchange, + routing_key=message.routing_key, + ) builder.set_channel(self._queue.channel) builder.set_message(message) return builder.build() diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py index 03937290ee..242971a1f9 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py @@ -32,7 +32,10 @@ def _get_publish_span( ) -> Optional[Span]: builder = SpanBuilder(self._tracer) builder.set_as_producer() - builder.set_destination(f"{self._exchange.name},{routing_key}") + builder.set_destination( + exchange_name=self._exchange.name, + routing_key=routing_key, + ) builder.set_channel(self._exchange.channel) builder.set_message(message) return builder.build() diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index 5147881bdd..19289ec7cc 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -29,12 +29,18 @@ class SpanBuilder: + """ + https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes + https://opentelemetry.io/docs/specs/semconv/messaging/rabbitmq/#rabbitmq-attributes + """ + def __init__(self, tracer: Tracer): self._tracer = tracer self._attributes = _DEFAULT_ATTRIBUTES.copy() self._operation: MessagingOperationValues = None self._kind: SpanKind = None - self._destination: str = None + self._exchange_name: str = None + self._routing_key: str = None def set_as_producer(self): self._kind = SpanKind.PRODUCER @@ -45,11 +51,17 @@ def set_as_consumer(self): def set_operation(self, operation: MessagingOperationValues): self._operation = operation - def set_destination(self, destination: str): - self._destination = destination + def set_destination(self, exchange_name: str, routing_key: str): + self._exchange_name = exchange_name + self._routing_key = routing_key + # messaging.destination.name MUST be set to the name of the exchange. self._attributes[SpanAttributes.MESSAGING_DESTINATION_NAME] = ( - destination + exchange_name ) + if routing_key: + self._attributes[ + SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY + ] = routing_key def set_channel(self, channel: AbstractChannel): if hasattr(channel, "_connection"): @@ -102,4 +114,4 @@ def build(self) -> Optional[Span]: def _generate_span_name(self) -> str: operation_value = self._operation.value if self._operation else "send" - return f"{self._destination} {operation_value}" + return f"{self._exchange_name},{self._routing_key} {operation_value}" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index 431780ae7b..442b276c77 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -21,10 +21,14 @@ CONNECTION_8 = Namespace(url=SERVER_URL) CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None) CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None) + MESSAGE = Namespace( + # https://github.com/mosquito/aio-pika/blob/master/aio_pika/abc.py + # https://aio-pika.readthedocs.io/en/latest/_modules/aio_pika/message.html properties=Namespace( message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={} ), exchange=EXCHANGE_NAME, + routing_key=ROUTING_KEY, headers={}, ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py index 07df16fd3a..9baa469947 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -19,9 +19,13 @@ from opentelemetry.instrumentation.aio_pika.callback_decorator import ( CallbackDecorator, ) +from opentelemetry.semconv._incubating.attributes import ( + messaging_attributes as SpanAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + net_attributes as NetAttributes, +) from opentelemetry.trace import SpanKind, get_tracer -from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes -from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes from .consts import ( AIOPIKA_VERSION_INFO, @@ -33,6 +37,7 @@ MESSAGE_ID, MESSAGING_SYSTEM, QUEUE_NAME, + ROUTING_KEY, SERVER_HOST, SERVER_PORT, ) @@ -43,6 +48,7 @@ class TestInstrumentedQueueAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME, + SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY, NetAttributes.NET_PEER_NAME: SERVER_HOST, NetAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, @@ -60,7 +66,7 @@ def test_get_callback_span(self): tracer = mock.MagicMock() CallbackDecorator(tracer, queue)._get_span(MESSAGE) tracer.start_span.assert_called_once_with( - f"{EXCHANGE_NAME} receive", + f"{EXCHANGE_NAME},{ROUTING_KEY} receive", kind=SpanKind.CONSUMER, attributes=self.EXPECTED_ATTRIBUTES, ) @@ -83,6 +89,7 @@ class TestInstrumentedQueueAioRmq8(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME, + SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY, NetAttributes.NET_PEER_NAME: SERVER_HOST, NetAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, @@ -100,7 +107,7 @@ def test_get_callback_span(self): tracer = mock.MagicMock() CallbackDecorator(tracer, queue)._get_span(MESSAGE) tracer.start_span.assert_called_once_with( - f"{EXCHANGE_NAME} receive", + f"{EXCHANGE_NAME},{ROUTING_KEY} receive", kind=SpanKind.CONSUMER, attributes=self.EXPECTED_ATTRIBUTES, ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 5b5a062b8d..e53ea2b04c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -21,10 +21,14 @@ from opentelemetry.instrumentation.aio_pika.publish_decorator import ( PublishDecorator, ) +from opentelemetry.semconv._incubating.attributes import ( + messaging_attributes as SpanAttributes, +) +from opentelemetry.semconv._incubating.attributes import ( + net_attributes as NetAttributes, +) from opentelemetry.trace import SpanKind, get_tracer -from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes -from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes from .consts import ( AIOPIKA_VERSION_INFO, CHANNEL_7, @@ -46,7 +50,8 @@ class TestInstrumentedExchangeAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, - SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: FIXIT - split name & key + SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME, + SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY, NetAttributes.NET_PEER_NAME: SERVER_HOST, NetAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, @@ -125,7 +130,8 @@ def test_publish_works_with_not_recording_span_robust(self): class TestInstrumentedExchangeAioRmq8(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, - SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: fixit + SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME, + SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY, NetAttributes.NET_PEER_NAME: SERVER_HOST, NetAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py index a4a1d8ec8b..33fbddd724 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -16,11 +16,15 @@ from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from opentelemetry.trace import Span, get_tracer +from .consts import EXCHANGE_NAME, ROUTING_KEY + class TestBuilder(TestCase): def test_build(self): builder = SpanBuilder(get_tracer(__name__)) builder.set_as_consumer() - builder.set_destination("destination") + builder.set_destination( + exchange_name=EXCHANGE_NAME, routing_key=ROUTING_KEY + ) span = builder.build() self.assertTrue(isinstance(span, Span))