Skip to content

Commit

Permalink
messaging.rabbitmq.destination.routing_key
Browse files Browse the repository at this point in the history
  • Loading branch information
P6rguVyrst committed Jul 13, 2024
1 parent 02f02ad commit 0f791f9
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def _get_span(self, message: AbstractIncomingMessage) -> Optional[Span]:
builder = SpanBuilder(self._tracer)
builder.set_as_consumer()
builder.set_operation(MessagingOperationValues.RECEIVE)
builder.set_destination(message.exchange or message.routing_key)
builder.set_destination(
exchange_name=message.exchange,
routing_key=message.routing_key,
)
builder.set_channel(self._queue.channel)
builder.set_message(message)
return builder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ def _get_publish_span(
) -> Optional[Span]:
builder = SpanBuilder(self._tracer)
builder.set_as_producer()
builder.set_destination(f"{self._exchange.name},{routing_key}")
builder.set_destination(
exchange_name=self._exchange.name,
routing_key=routing_key,
)
builder.set_channel(self._exchange.channel)
builder.set_message(message)
return builder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@


class SpanBuilder:
"""
https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
https://opentelemetry.io/docs/specs/semconv/messaging/rabbitmq/#rabbitmq-attributes
"""

def __init__(self, tracer: Tracer):
self._tracer = tracer
self._attributes = _DEFAULT_ATTRIBUTES.copy()
self._operation: MessagingOperationValues = None
self._kind: SpanKind = None
self._destination: str = None
self._exchange_name: str = None
self._routing_key: str = None

def set_as_producer(self):
self._kind = SpanKind.PRODUCER
Expand All @@ -45,11 +51,17 @@ def set_as_consumer(self):
def set_operation(self, operation: MessagingOperationValues):
self._operation = operation

def set_destination(self, destination: str):
self._destination = destination
def set_destination(self, exchange_name: str, routing_key: str):
self._exchange_name = exchange_name
self._routing_key = routing_key
# messaging.destination.name MUST be set to the name of the exchange.
self._attributes[SpanAttributes.MESSAGING_DESTINATION_NAME] = (
destination
exchange_name
)
if routing_key:
self._attributes[
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY
] = routing_key

def set_channel(self, channel: AbstractChannel):
if hasattr(channel, "_connection"):
Expand Down Expand Up @@ -102,4 +114,4 @@ def build(self) -> Optional[Span]:

def _generate_span_name(self) -> str:
operation_value = self._operation.value if self._operation else "send"
return f"{self._destination} {operation_value}"
return f"{self._exchange_name},{self._routing_key} {operation_value}"
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
CONNECTION_8 = Namespace(url=SERVER_URL)
CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None)
CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None)

MESSAGE = Namespace(
# https://github.com/mosquito/aio-pika/blob/master/aio_pika/abc.py
# https://aio-pika.readthedocs.io/en/latest/_modules/aio_pika/message.html
properties=Namespace(
message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={}
),
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
headers={},
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
from opentelemetry.instrumentation.aio_pika.callback_decorator import (
CallbackDecorator,
)
from opentelemetry.semconv._incubating.attributes import (
messaging_attributes as SpanAttributes,
)
from opentelemetry.semconv._incubating.attributes import (
net_attributes as NetAttributes,
)
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes

from .consts import (
AIOPIKA_VERSION_INFO,
Expand All @@ -33,6 +37,7 @@
MESSAGE_ID,
MESSAGING_SYSTEM,
QUEUE_NAME,
ROUTING_KEY,
SERVER_HOST,
SERVER_PORT,
)
Expand All @@ -43,6 +48,7 @@ class TestInstrumentedQueueAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
Expand All @@ -60,7 +66,7 @@ def test_get_callback_span(self):
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME} receive",
f"{EXCHANGE_NAME},{ROUTING_KEY} receive",
kind=SpanKind.CONSUMER,
attributes=self.EXPECTED_ATTRIBUTES,
)
Expand All @@ -83,6 +89,7 @@ class TestInstrumentedQueueAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
Expand All @@ -100,7 +107,7 @@ def test_get_callback_span(self):
tracer = mock.MagicMock()
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
tracer.start_span.assert_called_once_with(
f"{EXCHANGE_NAME} receive",
f"{EXCHANGE_NAME},{ROUTING_KEY} receive",
kind=SpanKind.CONSUMER,
attributes=self.EXPECTED_ATTRIBUTES,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
from opentelemetry.instrumentation.aio_pika.publish_decorator import (
PublishDecorator,
)
from opentelemetry.semconv._incubating.attributes import (
messaging_attributes as SpanAttributes,
)
from opentelemetry.semconv._incubating.attributes import (
net_attributes as NetAttributes,
)
from opentelemetry.trace import SpanKind, get_tracer

from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes
from .consts import (
AIOPIKA_VERSION_INFO,
CHANNEL_7,
Expand All @@ -46,7 +50,8 @@
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: FIXIT - split name & key
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
Expand Down Expand Up @@ -125,7 +130,8 @@ def test_publish_works_with_not_recording_span_robust(self):
class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: fixit
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY: ROUTING_KEY,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder
from opentelemetry.trace import Span, get_tracer

from .consts import EXCHANGE_NAME, ROUTING_KEY


class TestBuilder(TestCase):
def test_build(self):
builder = SpanBuilder(get_tracer(__name__))
builder.set_as_consumer()
builder.set_destination("destination")
builder.set_destination(
exchange_name=EXCHANGE_NAME, routing_key=ROUTING_KEY
)
span = builder.build()
self.assertTrue(isinstance(span, Span))

0 comments on commit 0f791f9

Please sign in to comment.