Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ep2024-sprints][gh-1638] - Bump aio-pika schema version from 1.11.0 to 1.26.0 #2705

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]
dependencies = [
"opentelemetry-api ~= 1.5",
"opentelemetry-api ~= 1.25",
"opentelemetry-instrumentation == 0.47b0.dev",
"wrapt >= 1.0.0, < 2.0.0",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _instrument(self, **kwargs):
_INSTRUMENTATION_MODULE_NAME,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url="https://opentelemetry.io/schemas/1.26.0",
)
self._instrument_queue(tracer)
self._instrument_exchange(tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
from aio_pika.abc import AbstractChannel, AbstractMessage

from opentelemetry.instrumentation.utils import is_instrumentation_enabled
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
from opentelemetry.semconv._incubating.attributes import (
messaging_attributes as SpanAttributes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the new semconv package reorganization people are just importing the variables straight from their module, I guess for the long names. In this case something like:

from opentelemetry.semconv._incubating.attributes.messaging_attributes import MESSAGING_DESTINATION_NAME, MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, MESSAGING_MESSAGE_ID, MESSAGING_MESSAGE_CONVERSATION_ID, SpanAttributes.MESSAGING_OPERATION_TYPE, SpanAttributes.MESSAGING_OPERATION_TYPE]
from opentelemetry.semconv._incubating.attributes.net_attributes import NET_PEER_NAME, NET_PEER_PORT

)
from opentelemetry.semconv._incubating.attributes import (
net_attributes as NetAttributes,
)
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.trace import Span, SpanKind, Tracer

_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"}
Expand All @@ -44,7 +47,9 @@ def set_operation(self, operation: MessagingOperationValues):

def set_destination(self, destination: str):
self._destination = destination
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination
self._attributes[SpanAttributes.MESSAGING_DESTINATION_NAME] = (
destination
)

def set_channel(self, channel: AbstractChannel):
if hasattr(channel, "_connection"):
Expand All @@ -61,31 +66,37 @@ def set_channel(self, channel: AbstractChannel):
url = connection.url
self._attributes.update(
{
SpanAttributes.NET_PEER_NAME: url.host,
SpanAttributes.NET_PEER_PORT: url.port or 5672,
NetAttributes.NET_PEER_NAME: url.host,
NetAttributes.NET_PEER_PORT: url.port or 5672,
}
)

def set_message(self, message: AbstractMessage):
properties = message.properties
if properties.message_id:
self._attributes[
SpanAttributes.MESSAGING_MESSAGE_ID
] = properties.message_id
self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = (
properties.message_id
)
if properties.correlation_id:
self._attributes[
SpanAttributes.MESSAGING_CONVERSATION_ID
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID
] = properties.correlation_id

def build(self) -> Optional[Span]:
if not is_instrumentation_enabled():
return None
if self._operation:
self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value
self._attributes[SpanAttributes.MESSAGING_OPERATION_TYPE] = (
self._operation.value
)
else:
self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True
self._attributes[
SpanAttributes.MESSAGING_DESTINATION_TEMPORARY
] = True
span = self._tracer.start_span(
self._generate_span_name(), kind=self._kind, attributes=self._attributes
self._generate_span_name(),
kind=self._kind,
attributes=self._attributes,
)
return span

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from opentelemetry.instrumentation.aio_pika.callback_decorator import (
CallbackDecorator,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes

from .consts import (
AIOPIKA_VERSION_INFO,
Expand All @@ -41,12 +42,12 @@
class TestInstrumentedQueueAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION_TYPE: "receive",
}

def setUp(self):
Expand Down Expand Up @@ -81,12 +82,12 @@ def test_decorate_callback(self):
class TestInstrumentedQueueAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_DESTINATION_NAME: EXCHANGE_NAME,
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION: "receive",
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_OPERATION_TYPE: "receive",
}

def setUp(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from opentelemetry.instrumentation.aio_pika.publish_decorator import (
PublishDecorator,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.semconv._incubating.attributes import net_attributes as NetAttributes
from .consts import (
AIOPIKA_VERSION_INFO,
CHANNEL_7,
Expand All @@ -45,12 +46,12 @@
class TestInstrumentedExchangeAioRmq7(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: FIXIT - split name & key
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_DESTINATION_TEMPORARY: True,
}

def setUp(self):
Expand Down Expand Up @@ -124,12 +125,12 @@ def test_publish_works_with_not_recording_span_robust(self):
class TestInstrumentedExchangeAioRmq8(TestCase):
EXPECTED_ATTRIBUTES = {
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_DESTINATION_NAME: f"{EXCHANGE_NAME},{ROUTING_KEY}", # TODO: fixit
NetAttributes.NET_PEER_NAME: SERVER_HOST,
NetAttributes.NET_PEER_PORT: SERVER_PORT,
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
SpanAttributes.MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID,
SpanAttributes.MESSAGING_DESTINATION_TEMPORARY: True,
}

def setUp(self):
Expand Down
Loading