diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index dac8e82c8db4..aabd8579cd0d 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -10,6 +10,16 @@ ### Other Changes + - Updated tracing ([#29995](https://github.com/Azure/azure-sdk-for-python/pull/29995)): + - Additional attributes added to existing spans: + - `messaging.system` - messaging system (i.e., `servicebus`) + - `messaging.operation` - type of operation (i.e., `publish`, `receive`, or `settle`) + - `messaging.batch.message_count` - number of messages sent or received (if more than one) + - A span will now be created upon calls to the service that settle messages. + - The span name will contain the settlement operation (e.g., `ServiceBus.complete`) + - The span will contain `az.namespace`, `messaging.destination.name`, `net.peer.name`, `messaging.system`, and `messaging.operation` attributes. + - All `send` spans now contain links to `message` spans. Now, `message` spans will no longer contain a link to the `send` span. + ## 7.10.0b1 (2023-04-13) ### Features Added diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index dd720704dc29..d00dc7a896ee 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -32,11 +32,6 @@ TOKEN_TYPE_SASTOKEN, MGMT_REQUEST_OP_TYPE_ENTITY_MGMT, ASSOCIATEDLINKPROPERTYNAME, - TRACE_NAMESPACE_PROPERTY, - TRACE_COMPONENT_PROPERTY, - TRACE_COMPONENT, - TRACE_PEER_ADDRESS_PROPERTY, - TRACE_BUS_DESTINATION_PROPERTY, ) if TYPE_CHECKING: @@ -543,12 +538,6 @@ def _mgmt_request_response_with_retry( **kwargs ) - def _add_span_request_attributes(self, span): - span.add_attribute(TRACE_COMPONENT_PROPERTY, TRACE_COMPONENT) - span.add_attribute(TRACE_NAMESPACE_PROPERTY, TRACE_NAMESPACE_PROPERTY) - span.add_attribute(TRACE_BUS_DESTINATION_PROPERTY, self._entity_path) - span.add_attribute(TRACE_PEER_ADDRESS_PROPERTY, self.fully_qualified_namespace) - def _open(self): # pylint: disable=no-self-use raise ValueError("Subclass should override the method.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py index 1636be80a5e9..204895a17475 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py @@ -137,31 +137,6 @@ "ServiceBusDlqSupplementaryAuthorization" ) -# Distributed Tracing Constants - -TRACE_COMPONENT_PROPERTY = "component" -TRACE_COMPONENT = "servicebus" - -TRACE_NAMESPACE_PROPERTY = "az.namespace" -TRACE_NAMESPACE = "ServiceBus" - -SPAN_NAME_RECEIVE = TRACE_NAMESPACE + ".receive" -SPAN_NAME_RECEIVE_DEFERRED = TRACE_NAMESPACE + ".receive_deferred" -SPAN_NAME_PEEK = TRACE_NAMESPACE + ".peek" -SPAN_NAME_SEND = TRACE_NAMESPACE + ".send" -SPAN_NAME_SCHEDULE = TRACE_NAMESPACE + ".schedule" -SPAN_NAME_MESSAGE = TRACE_NAMESPACE + ".message" - -TRACE_BUS_DESTINATION_PROPERTY = "message_bus.destination" -TRACE_PEER_ADDRESS_PROPERTY = "peer.address" - -SPAN_ENQUEUED_TIME_PROPERTY = "enqueuedTime" - -TRACE_ENQUEUED_TIME_PROPERTY = b"x-opt-enqueued-time" -TRACE_PARENT_PROPERTY = b"Diagnostic-Id" -TRACE_PROPERTY_ENCODING = "ascii" - - MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP MESSAGE_PROPERTY_MAX_LENGTH = 128 # .NET TimeSpan.MaxValue: 10675199.02:48:05.4775807 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 4fcb54574058..68fc61d16963 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -46,9 +46,9 @@ from .utils import ( utc_from_timestamp, utc_now, - trace_message, transform_outbound_messages, ) +from .tracing import trace_message if TYPE_CHECKING: try: @@ -60,7 +60,6 @@ except ImportError: pass from .._pyamqp.performatives import TransferFrame - from azure.core.tracing import AbstractSpan from ..aio._servicebus_receiver_async import ( ServiceBusReceiver as AsyncServiceBusReceiver, ) @@ -659,6 +658,7 @@ def __init__( **kwargs: Any ) -> None: self._amqp_transport = kwargs.pop("amqp_transport", PyamqpTransport) + self._tracing_attributes: Dict[str, Union[str, int]] = kwargs.pop("tracing_attributes", {}) self._max_size_in_bytes = max_size_in_bytes or MAX_MESSAGE_LENGTH_BYTES self._message = self._amqp_transport.build_batch_message([]) @@ -676,15 +676,11 @@ def __repr__(self) -> str: def __len__(self) -> int: return self._count - def _from_list(self, messages: Iterable[ServiceBusMessage], parent_span: Optional["AbstractSpan"] = None) -> None: + def _from_list(self, messages: Iterable[ServiceBusMessage]) -> None: for message in messages: - self._add(message, parent_span) + self._add(message) - def _add( - self, - add_message: Union[ServiceBusMessage, Mapping[str, Any], AmqpAnnotatedMessage], - parent_span: Optional["AbstractSpan"] = None - ) -> None: + def _add(self, add_message: Union[ServiceBusMessage, Mapping[str, Any], AmqpAnnotatedMessage]) -> None: """Actual add implementation. The shim exists to hide the internal parameters such as parent_span.""" outgoing_sb_message = transform_outbound_messages( add_message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message @@ -694,8 +690,8 @@ def _add( outgoing_sb_message._message = trace_message( outgoing_sb_message._message, amqp_transport=self._amqp_transport, - parent_span=parent_span - ) # parent_span is e.g. if built as part of a send operation. + additional_attributes=self._tracing_attributes + ) message_size = self._amqp_transport.get_message_encoded_size( outgoing_sb_message._message # pylint: disable=protected-access ) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py new file mode 100644 index 000000000000..142be9ffdaa0 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/tracing.py @@ -0,0 +1,302 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +from __future__ import annotations +from contextlib import contextmanager +from enum import Enum +import logging +from typing import ( + Dict, + Iterable, + Iterator, + List, + Optional, + Type, + TYPE_CHECKING, + Union, + cast, +) + +from azure.core import CaseInsensitiveEnumMeta +from azure.core.settings import settings +from azure.core.tracing import SpanKind, Link + +if TYPE_CHECKING: + try: + # pylint:disable=unused-import + from uamqp import Message as uamqp_Message + except ImportError: + uamqp_Message = None + from azure.core.tracing import AbstractSpan + + from .._pyamqp.message import Message as pyamqp_Message + from .message import ( + ServiceBusReceivedMessage, + ServiceBusMessage, + ServiceBusMessageBatch + ) + from .._base_handler import BaseHandler + from ..aio._base_handler_async import BaseHandler as BaseHandlerAsync + from .._servicebus_receiver import ServiceBusReceiver + from ..aio._servicebus_receiver_async import ServiceBusReceiver as ServiceBusReceiverAsync + from .._servicebus_sender import ServiceBusSender + from ..aio._servicebus_sender_async import ServiceBusSender as ServiceBusSenderAsync + from .._transport._base import AmqpTransport + from ..aio._transport._base_async import AmqpTransportAsync + + ReceiveMessageTypes = Union[ + ServiceBusReceivedMessage, + pyamqp_Message, + uamqp_Message + ] + +TRACE_DIAGNOSTIC_ID_PROPERTY = b"Diagnostic-Id" +TRACE_ENQUEUED_TIME_PROPERTY = b"x-opt-enqueued-time" +TRACE_PARENT_PROPERTY = b"traceparent" +TRACE_STATE_PROPERTY = b"tracestate" +TRACE_PROPERTY_ENCODING = "ascii" + +SPAN_ENQUEUED_TIME_PROPERTY = "enqueuedTime" + +SPAN_NAME_RECEIVE = "ServiceBus.receive" +SPAN_NAME_RECEIVE_DEFERRED = "ServiceBus.receive_deferred" +SPAN_NAME_PEEK = "ServiceBus.peek" +SPAN_NAME_SEND = "ServiceBus.send" +SPAN_NAME_SCHEDULE = "ServiceBus.schedule" +SPAN_NAME_MESSAGE = "ServiceBus.message" + + +_LOGGER = logging.getLogger(__name__) + + +class TraceAttributes: + TRACE_NAMESPACE_ATTRIBUTE = "az.namespace" + TRACE_NAMESPACE = "Microsoft.ServiceBus" + + TRACE_MESSAGING_SYSTEM_ATTRIBUTE = "messaging.system" + TRACE_MESSAGING_SYSTEM = "servicebus" + + TRACE_NET_PEER_NAME_ATTRIBUTE = "net.peer.name" + TRACE_MESSAGING_DESTINATION_ATTRIBUTE = "messaging.destination.name" + TRACE_MESSAGING_OPERATION_ATTRIBUTE = "messaging.operation" + TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE = "messaging.batch.message_count" + + LEGACY_TRACE_COMPONENT_ATTRIBUTE = "component" + LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE = "message_bus.destination" + LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE = "peer.address" + + +class TraceOperationTypes(str, Enum, metaclass=CaseInsensitiveEnumMeta): + PUBLISH = "publish" + RECEIVE = "receive" + SETTLE = "settle" + + +def is_tracing_enabled(): + span_impl_type = settings.tracing_implementation() + return span_impl_type is not None + + +@contextmanager +def send_trace_context_manager( + sender: Union[ServiceBusSender, ServiceBusSenderAsync], + span_name: str = SPAN_NAME_SEND, + links: Optional[List[Link]] = None +) -> Iterator[None]: + """Tracing for sending messages.""" + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links) as span: + add_span_attributes(span, TraceOperationTypes.PUBLISH, sender, message_count=len(links)) + yield + else: + yield + + +@contextmanager +def receive_trace_context_manager( + receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], + span_name: str = SPAN_NAME_RECEIVE, + links: Optional[List[Link]] = None, + start_time: Optional[int] = None +) -> Iterator[None]: + """Tracing for receiving messages.""" + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=span_name, kind=SpanKind.CLIENT, links=links, start_time=start_time) as span: + add_span_attributes(span, TraceOperationTypes.RECEIVE, receiver, message_count=len(links)) + yield + else: + yield + + +@contextmanager +def settle_trace_context_manager( + receiver: Union[ServiceBusReceiver, ServiceBusReceiverAsync], + operation: str, + links: Optional[List[Link]] = None +): + """Tracing for settling messages.""" + span_impl_type = settings.tracing_implementation() + if span_impl_type is not None: + links = links or [] + with span_impl_type(name=f"ServiceBus.{operation}", kind=SpanKind.CLIENT, links=links) as span: + add_span_attributes(span, TraceOperationTypes.SETTLE, receiver) + yield + else: + yield + + +def trace_message( + message: Union[uamqp_Message, pyamqp_Message], + amqp_transport: Union[AmqpTransport, AmqpTransportAsync], + additional_attributes: Optional[Dict[str, Union[str, int]]] = None +) -> Union["uamqp_Message", "pyamqp_Message"]: + """Adds tracing information to the message and returns the updated message. + + Will open and close a message span, and add tracing context to the app properties of the message. + """ + try: + span_impl_type: Type[AbstractSpan] = settings.tracing_implementation() + if span_impl_type is not None: + with span_impl_type(name=SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER) as message_span: + headers = message_span.to_header() + + if "traceparent" in headers: + message = amqp_transport.update_message_app_properties( + message, + TRACE_DIAGNOSTIC_ID_PROPERTY, + headers["traceparent"] + ) + message = amqp_transport.update_message_app_properties( + message, + TRACE_PARENT_PROPERTY, + headers["traceparent"] + ) + + if "tracestate" in headers: + message = amqp_transport.update_message_app_properties( + message, + TRACE_STATE_PROPERTY, + headers["tracestate"] + ) + + message_span.add_attribute( + TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE + ) + message_span.add_attribute( + TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM + ) + + if additional_attributes: + for key, value in additional_attributes.items(): + if value is not None: + message_span.add_attribute(key, value) + + except Exception as exp: # pylint:disable=broad-except + _LOGGER.warning("trace_message had an exception %r", exp) + + return message + + +def get_receive_links(messages: Union[ReceiveMessageTypes, Iterable[ReceiveMessageTypes]]) -> List[Link]: + if not is_tracing_enabled(): + return [] + + trace_messages = ( + messages if isinstance(messages, Iterable) # pylint:disable=isinstance-second-argument-not-valid-type + else (messages,) + ) + + links = [] + try: + for message in trace_messages: + if message.application_properties: + headers = {} + + traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") + if hasattr(traceparent, "decode"): + traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) + if traceparent: + headers["traceparent"] = cast(str, traceparent) + + tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") + if hasattr(tracestate, "decode"): + tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) + if tracestate: + headers["tracestate"] = cast(str, tracestate) + + enqueued_time = message.raw_amqp_message.annotations.get(TRACE_ENQUEUED_TIME_PROPERTY) + attributes = {SPAN_ENQUEUED_TIME_PROPERTY: enqueued_time} if enqueued_time else None + + if headers: + links.append(Link(headers, attributes=attributes)) + except AttributeError: + pass + return links + + +def get_span_links_from_batch(batch: ServiceBusMessageBatch) -> List[Link]: + """Create span links from a batch of messages.""" + links = [] + for message in batch._messages: # pylint: disable=protected-access + link = get_span_link_from_message(message._message) # pylint: disable=protected-access + if link: + links.append(link) + return links + + +def get_span_link_from_message(message: Union[uamqp_Message, pyamqp_Message, ServiceBusMessage]) -> Optional[Link]: + """Create a span link from a message. + + This will extract the traceparent and tracestate from the message application properties and create span links + based on these values. + """ + headers = {} + try: + if message.application_properties: + traceparent = message.application_properties.get(TRACE_PARENT_PROPERTY, b"") + if hasattr(traceparent, "decode"): + traceparent = traceparent.decode(TRACE_PROPERTY_ENCODING) + if traceparent: + headers["traceparent"] = cast(str, traceparent) + + tracestate = message.application_properties.get(TRACE_STATE_PROPERTY, b"") + if hasattr(tracestate, "decode"): + tracestate = tracestate.decode(TRACE_PROPERTY_ENCODING) + if tracestate: + headers["tracestate"] = cast(str, tracestate) + except AttributeError : + return None + return Link(headers) if headers else None + + +def add_span_attributes( + span: AbstractSpan, + operation_type: TraceOperationTypes, + handler: Union[BaseHandler, BaseHandlerAsync], + message_count: int = 0 +) -> None: + """Add attributes to span based on the operation type.""" + + span.add_attribute(TraceAttributes.TRACE_NAMESPACE_ATTRIBUTE, TraceAttributes.TRACE_NAMESPACE) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_SYSTEM_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_OPERATION_ATTRIBUTE, operation_type) + + if message_count > 1: + span.add_attribute(TraceAttributes.TRACE_MESSAGING_BATCH_COUNT_ATTRIBUTE, message_count) + + if operation_type in (TraceOperationTypes.PUBLISH, TraceOperationTypes.RECEIVE): + # Maintain legacy attributes for backwards compatibility. + span.add_attribute(TraceAttributes.LEGACY_TRACE_COMPONENT_ATTRIBUTE, TraceAttributes.TRACE_MESSAGING_SYSTEM) + span.add_attribute(TraceAttributes.LEGACY_TRACE_MESSAGE_BUS_DESTINATION_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access + span.add_attribute(TraceAttributes.LEGACY_TRACE_PEER_ADDRESS_ATTRIBUTE, handler.fully_qualified_namespace) + + elif operation_type == TraceOperationTypes.SETTLE: + span.add_attribute(TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE, handler.fully_qualified_namespace) + span.add_attribute(TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE, handler._entity_name) # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 8ce4a9ad5374..7753ea849e4c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -3,7 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for # license information. # ------------------------------------------------------------------------- - import sys import datetime import logging @@ -12,8 +11,6 @@ from typing import ( Any, Dict, - Iterable, - Iterator, List, Mapping, Optional, @@ -24,7 +21,6 @@ cast, Callable ) -from contextlib import contextmanager from datetime import timezone try: @@ -32,9 +28,6 @@ except ImportError: from urllib.parse import urlparse -from azure.core.settings import settings -from azure.core.tracing import SpanKind, Link - from .._version import VERSION from .constants import ( JWT_TOKEN_SCOPE, @@ -43,15 +36,6 @@ DEAD_LETTER_QUEUE_SUFFIX, TRANSFER_DEAD_LETTER_QUEUE_SUFFIX, USER_AGENT_PREFIX, - SPAN_NAME_SEND, - SPAN_NAME_MESSAGE, - TRACE_PARENT_PROPERTY, - TRACE_NAMESPACE, - TRACE_NAMESPACE_PROPERTY, - TRACE_PROPERTY_ENCODING, - TRACE_ENQUEUED_TIME_PROPERTY, - SPAN_ENQUEUED_TIME_PROPERTY, - SPAN_NAME_RECEIVE, ) from ..amqp import AmqpAnnotatedMessage @@ -59,24 +43,16 @@ try: # pylint:disable=unused-import from uamqp import ( - Message as uamqp_Message, types as uamqp_types ) from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth except ImportError: pass - from .._pyamqp.message import Message as pyamqp_Message from .._pyamqp.authentication import JWTTokenAuth as pyamqp_JWTTokenAuth - from .message import ( - ServiceBusReceivedMessage, - ServiceBusMessage, - ) - from azure.core.tracing import AbstractSpan + from .message import ServiceBusReceivedMessage, ServiceBusMessage from azure.core.credentials import AzureSasCredential - from .receiver_mixins import ReceiverMixin from .._servicebus_session import BaseSession from .._transport._base import AmqpTransport - from ..aio._transport._base_async import AmqpTransportAsync MessagesType = Union[ Mapping[str, Any], @@ -275,91 +251,6 @@ def strip_protocol_from_uri(uri: str) -> str: return uri -@contextmanager -def send_trace_context_manager(span_name=SPAN_NAME_SEND): - span_impl_type: Type["AbstractSpan"] = settings.tracing_implementation() - - if span_impl_type is not None: - with span_impl_type(name=span_name, kind=SpanKind.CLIENT) as child: - yield child - else: - yield None - - -@contextmanager -def receive_trace_context_manager( - receiver: "ReceiverMixin", - span_name: str = SPAN_NAME_RECEIVE, - links: Optional[List[Link]] = None -) -> Iterator[None]: - """Tracing""" - span_impl_type: Type["AbstractSpan"] = settings.tracing_implementation() - if span_impl_type is None: - yield - else: - receive_span = span_impl_type(name=span_name, kind=SpanKind.CONSUMER, links=links) - receiver._add_span_request_attributes(receive_span) # type: ignore # pylint: disable=protected-access - - with receive_span: - yield - - -def trace_message( - message: Union["uamqp_Message", "pyamqp_Message"], - amqp_transport: Union["AmqpTransport", "AmqpTransportAsync"], - parent_span: Optional["AbstractSpan"] = None -) -> Union["uamqp_Message", "pyamqp_Message"]: - """Add tracing information to this message. - Will open and close a "Azure.Servicebus.message" span, and - add the "DiagnosticId" as app properties of the message. - """ - try: - span_impl_type: Type["AbstractSpan"] = settings.tracing_implementation() - if span_impl_type is not None: - current_span = parent_span or span_impl_type( - span_impl_type.get_current_span() - ) - link = Link({ - 'traceparent': current_span.get_trace_parent() - }) - with current_span.span(name=SPAN_NAME_MESSAGE, kind=SpanKind.PRODUCER, links=[link]) as message_span: - message_span.add_attribute(TRACE_NAMESPACE_PROPERTY, TRACE_NAMESPACE) - message = amqp_transport.update_message_app_properties( - message, - TRACE_PARENT_PROPERTY, - message_span.get_trace_parent().encode(TRACE_PROPERTY_ENCODING), - ) - except Exception as exp: # pylint:disable=broad-except - _log.warning("trace_message had an exception %r", exp) - - return message - - -def get_receive_links(messages): - trace_messages = ( - messages if isinstance(messages, Iterable) # pylint:disable=isinstance-second-argument-not-valid-type - else (messages,) - ) - - links = [] - try: - for message in trace_messages: # type: ignore - if message.application_properties: - traceparent = message.application_properties.get( - TRACE_PARENT_PROPERTY, "" - ).decode(TRACE_PROPERTY_ENCODING) - if traceparent: - links.append(Link({'traceparent': traceparent}, - { - SPAN_ENQUEUED_TIME_PROPERTY: message.raw_amqp_message.annotations.get( - TRACE_ENQUEUED_TIME_PROPERTY - ) - })) - except AttributeError: - pass - return links - - def parse_sas_credential(credential: "AzureSasCredential") -> Tuple: sas = credential.signature parsed_sas = sas.split('&') diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 45b184a83cd0..0fe7bb974b60 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -16,10 +16,14 @@ from .exceptions import ServiceBusError from ._base_handler import BaseHandler from ._common.message import ServiceBusReceivedMessage -from ._common.utils import ( - create_authentication, +from ._common.utils import create_authentication +from ._common.tracing import ( get_receive_links, receive_trace_context_manager, + settle_trace_context_manager, + get_span_link_from_message, + SPAN_NAME_RECEIVE_DEFERRED, + SPAN_NAME_PEEK, ) from ._common.constants import ( CONSUMER_IDENTIFIER, @@ -34,8 +38,6 @@ MGMT_REQUEST_RECEIVER_SETTLE_MODE, MGMT_REQUEST_FROM_SEQUENCE_NUMBER, MGMT_REQUEST_MAX_MESSAGE_COUNT, - SPAN_NAME_RECEIVE_DEFERRED, - SPAN_NAME_PEEK, MESSAGE_COMPLETE, MESSAGE_ABANDON, MESSAGE_DEFER, @@ -468,17 +470,18 @@ def _settle_message_with_retry( message="The lock on the message lock has expired.", error=message.auto_renew_error, ) - - self._do_retryable_operation( - self._settle_message, - timeout=None, - message=message, - settle_operation=settle_operation, - dead_letter_reason=dead_letter_reason, - dead_letter_error_description=dead_letter_error_description, - ) - - message._settled = True + link = get_span_link_from_message(message) + trace_links = [link] if link else [] + with settle_trace_context_manager(self, settle_operation, links=trace_links): + self._do_retryable_operation( + self._settle_message, + timeout=None, + message=message, + settle_operation=settle_operation, + dead_letter_reason=dead_letter_reason, + dead_letter_error_description=dead_letter_error_description, + ) + message._settled = True def _settle_message( self, @@ -656,6 +659,7 @@ def receive_messages( raise ValueError("The max_wait_time must be greater than 0.") if max_message_count is not None and max_message_count <= 0: raise ValueError("The max_message_count must be greater than 0") + start_time = time.time_ns() messages = self._do_retryable_operation( self._receive, max_message_count=max_message_count, @@ -663,7 +667,7 @@ def receive_messages( operation_requires_timeout=True, ) links = get_receive_links(messages) - with receive_trace_context_manager(self, links=links): + with receive_trace_context_manager(self, links=links, start_time=start_time): if ( self._auto_lock_renewer and not self._session @@ -732,6 +736,7 @@ def receive_deferred_messages( receiver=self, amqp_transport=self._amqp_transport, ) + start_time = time.time_ns() messages = self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, @@ -740,7 +745,7 @@ def receive_deferred_messages( ) links = get_receive_links(messages) with receive_trace_context_manager( - self, span_name=SPAN_NAME_RECEIVE_DEFERRED, links=links + self, span_name=SPAN_NAME_RECEIVE_DEFERRED, links=links, start_time=start_time ): if ( self._auto_lock_renewer @@ -800,11 +805,12 @@ def peek_messages( self._populate_message_properties(message) handler = functools.partial(mgmt_handlers.peek_op, receiver=self, amqp_transport=self._amqp_transport) + start_time = time.time_ns() messages = self._mgmt_request_response_with_retry( REQUEST_RESPONSE_PEEK_OPERATION, message, handler, timeout=timeout ) links = get_receive_links(messages) - with receive_trace_context_manager(self, span_name=SPAN_NAME_PEEK, links=links): + with receive_trace_context_manager(self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time): return messages def complete_message(self, message: ServiceBusReceivedMessage) -> None: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 94a23f47450a..41a31b40bfbf 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -16,11 +16,15 @@ ServiceBusMessageBatch, ) from .amqp import AmqpAnnotatedMessage -from ._common.utils import ( - create_authentication, - transform_outbound_messages, +from ._common.utils import create_authentication, transform_outbound_messages +from ._common.tracing import ( send_trace_context_manager, trace_message, + is_tracing_enabled, + get_span_links_from_batch, + get_span_link_from_message, + SPAN_NAME_SCHEDULE, + TraceAttributes, ) from ._common.constants import ( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, @@ -31,8 +35,7 @@ MGMT_REQUEST_MESSAGES, MGMT_REQUEST_MESSAGE_ID, MGMT_REQUEST_PARTITION_KEY, - SPAN_NAME_SCHEDULE, - MAX_MESSAGE_LENGTH_BYTES + MAX_MESSAGE_LENGTH_BYTES, ) if TYPE_CHECKING: @@ -78,8 +81,9 @@ def _create_attribute(self, **kwargs): self.entity_name = self._entity_name @classmethod - def _build_schedule_request(cls, schedule_time_utc, send_span, amqp_transport, *messages): + def _build_schedule_request(cls, schedule_time_utc, amqp_transport, tracing_attributes, *messages): request_body = {MGMT_REQUEST_MESSAGES: []} + trace_links = [] for message in messages: if not isinstance(message, ServiceBusMessage): raise ValueError( @@ -93,7 +97,17 @@ def _build_schedule_request(cls, schedule_time_utc, send_span, amqp_transport, * to_outgoing_amqp_message=amqp_transport.to_outgoing_amqp_message ) # pylint: disable=protected-access - message._message = trace_message(message._message, amqp_transport=amqp_transport, parent_span=send_span) + message._message = trace_message( + message._message, + amqp_transport=amqp_transport, + additional_attributes=tracing_attributes + ) + + if is_tracing_enabled(): + link = get_span_link_from_message(message._message) + if link: + trace_links.append(link) + message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id if message.session_id: @@ -104,7 +118,7 @@ def _build_schedule_request(cls, schedule_time_utc, send_span, amqp_transport, * amqp_transport.encode_message(message) ) request_body[MGMT_REQUEST_MESSAGES].append(message_data) - return request_body + return request_body, trace_links class ServiceBusSender(BaseHandler, SenderMixin): @@ -301,19 +315,28 @@ def schedule_messages( if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") - with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span: - if isinstance(obj_messages, ServiceBusMessage): - request_body = self._build_schedule_request( - schedule_time_utc, send_span, self._amqp_transport, obj_messages - ) - else: - if len(obj_messages) == 0: - return [] # No-op on empty list. - request_body = self._build_schedule_request( - schedule_time_utc, send_span, self._amqp_transport, *obj_messages - ) - if send_span: - self._add_span_request_attributes(send_span) + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } + if isinstance(obj_messages, ServiceBusMessage): + request_body, trace_links = self._build_schedule_request( + schedule_time_utc, + self._amqp_transport, + tracing_attributes, + obj_messages + ) + else: + if len(obj_messages) == 0: + return [] # No-op on empty list. + request_body, trace_links = self._build_schedule_request( + schedule_time_utc, + self._amqp_transport, + tracing_attributes, + *obj_messages + ) + + with send_trace_context_manager(self, span_name=SPAN_NAME_SCHEDULE, links=trace_links): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -417,34 +440,45 @@ def send_messages( pass obj_message: Union[ServiceBusMessage, ServiceBusMessageBatch] - with send_trace_context_manager() as send_span: - if isinstance(message, ServiceBusMessageBatch): - # If AmqpTransports are not the same, create batch with correct BatchMessage. - if self._amqp_transport.KIND != message._amqp_transport.KIND: # pylint: disable=protected-access - # pylint: disable=protected-access - batch = self.create_message_batch() - batch._from_list(message._messages, send_span) # type: ignore - obj_message = batch - else: - obj_message = message + + if isinstance(message, ServiceBusMessageBatch): + # If AmqpTransports are not the same, create batch with correct BatchMessage. + if self._amqp_transport.KIND != message._amqp_transport.KIND: # pylint: disable=protected-access + # pylint: disable=protected-access + batch = self.create_message_batch() + batch._from_list(message._messages) # type: ignore + obj_message = batch else: - obj_message = transform_outbound_messages( # type: ignore - message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message + obj_message = message + else: + obj_message = transform_outbound_messages( # type: ignore + message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message + ) + try: + batch = self.create_message_batch() + batch._from_list(obj_message) # type: ignore # pylint: disable=protected-access + obj_message = batch + except TypeError: # Message was not a list or generator. Do needed tracing. + # pylint: disable=protected-access + obj_message._message = trace_message( + obj_message._message, + amqp_transport=self._amqp_transport, + additional_attributes={ + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) - try: - batch = self.create_message_batch() - batch._from_list(obj_message, send_span) # type: ignore # pylint: disable=protected-access - obj_message = batch - except TypeError: # Message was not a list or generator. Do needed tracing. - # pylint: disable=protected-access - obj_message._message = trace_message( - obj_message._message, - amqp_transport=self._amqp_transport, - parent_span=send_span - ) - - if send_span: - self._add_span_request_attributes(send_span) + + trace_links = [] + if is_tracing_enabled(): + if isinstance(obj_message, ServiceBusMessageBatch): + trace_links = get_span_links_from_batch(obj_message) + else: + link = get_span_link_from_message(obj_message._message) # pylint: disable=protected-access + if link: + trace_links.append(link) + + with send_trace_context_manager(self, links=trace_links): self._do_retryable_operation( self._send, message=obj_message, @@ -485,7 +519,12 @@ def create_message_batch( ) return ServiceBusMessageBatch( - max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), amqp_transport=self._amqp_transport + max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), + amqp_transport=self._amqp_transport, + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) @property diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py index b7be99aff957..3d2b4e443f85 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py @@ -34,12 +34,8 @@ from .._pyamqp._connection import Connection, _CLOSING_STATES from ._base import AmqpTransport -from .._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager -) +from .._common.utils import utc_from_timestamp, utc_now +from .._common.tracing import get_receive_links, receive_trace_context_manager from .._common.constants import ( PYAMQP_LIBRARY, DATETIMEOFFSET_EPOCH, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py index b0c5a808488f..04bdb9f9e52c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py @@ -54,12 +54,8 @@ ) from ._base import AmqpTransport from ..amqp._constants import AmqpMessageBodyType - from .._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager, - ) + from .._common.utils import utc_from_timestamp, utc_now + from .._common.tracing import get_receive_links, receive_trace_context_manager from .._common.constants import ( UAMQP_LIBRARY, DATETIMEOFFSET_EPOCH, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index d0c58325bc92..061f276512b4 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -386,11 +386,6 @@ async def _mgmt_request_response_with_retry( **kwargs ) - def _add_span_request_attributes(self, span): - return BaseHandlerSync._add_span_request_attributes( # pylint: disable=protected-access - self, span - ) - async def _open(self): # pylint: disable=no-self-use raise ValueError("Subclass should override the method.") diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 8b1cdbb70849..59199dfce7cb 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -9,6 +9,7 @@ import datetime import functools import logging +import time import warnings from enum import Enum from typing import Any, List, Optional, AsyncIterator, Union, TYPE_CHECKING, cast @@ -40,14 +41,16 @@ MGMT_REQUEST_DEAD_LETTER_REASON, MGMT_REQUEST_DEAD_LETTER_ERROR_DESCRIPTION, MGMT_RESPONSE_MESSAGE_EXPIRATION, - SPAN_NAME_RECEIVE_DEFERRED, - SPAN_NAME_PEEK, ) from .._common import mgmt_handlers -from .._common.utils import ( +from .._common.utils import utc_from_timestamp +from .._common.tracing import ( receive_trace_context_manager, - utc_from_timestamp, + settle_trace_context_manager, get_receive_links, + get_span_link_from_message, + SPAN_NAME_RECEIVE_DEFERRED, + SPAN_NAME_PEEK, ) from ._async_utils import create_authentication @@ -449,16 +452,18 @@ async def _settle_message_with_retry( message="The lock on the message lock has expired.", error=message.auto_renew_error, ) - - await self._do_retryable_operation( - self._settle_message, - timeout=None, - message=message, - settle_operation=settle_operation, - dead_letter_reason=dead_letter_reason, - dead_letter_error_description=dead_letter_error_description, - ) - message._settled = True + link = get_span_link_from_message(message) + trace_links = [link] if link else [] + with settle_trace_context_manager(self, settle_operation, links=trace_links): + await self._do_retryable_operation( + self._settle_message, + timeout=None, + message=message, + settle_operation=settle_operation, + dead_letter_reason=dead_letter_reason, + dead_letter_error_description=dead_letter_error_description, + ) + message._settled = True async def _settle_message( # type: ignore self, @@ -628,6 +633,7 @@ async def receive_messages( raise ValueError("The max_wait_time must be greater than 0.") if max_message_count is not None and max_message_count <= 0: raise ValueError("The max_message_count must be greater than 0") + start_time = time.time_ns() messages = await self._do_retryable_operation( self._receive, max_message_count=max_message_count, @@ -635,7 +641,7 @@ async def receive_messages( operation_requires_timeout=True, ) links = get_receive_links(messages) - with receive_trace_context_manager(self, links=links): + with receive_trace_context_manager(self, links=links, start_time=start_time): if ( self._auto_lock_renewer and not self._session @@ -700,6 +706,7 @@ async def receive_deferred_messages( receiver=self, amqp_transport=self._amqp_transport, ) + start_time = time.time_ns() messages = await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, message, @@ -708,7 +715,7 @@ async def receive_deferred_messages( ) links = get_receive_links(message) with receive_trace_context_manager( - self, span_name=SPAN_NAME_RECEIVE_DEFERRED, links=links + self, span_name=SPAN_NAME_RECEIVE_DEFERRED, links=links, start_time=start_time ): if ( self._auto_lock_renewer @@ -762,12 +769,13 @@ async def peek_messages( self._populate_message_properties(message) handler = functools.partial(mgmt_handlers.peek_op, receiver=self, amqp_transport=self._amqp_transport) + start_time = time.time_ns() messages = await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_PEEK_OPERATION, message, handler, timeout=timeout ) links = get_receive_links(message) with receive_trace_context_manager( - self, span_name=SPAN_NAME_PEEK, links=links + self, span_name=SPAN_NAME_PEEK, links=links, start_time=start_time ): return messages diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index f8cf9680e6b0..aede9f10716a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -21,14 +21,18 @@ REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, MGMT_REQUEST_SEQUENCE_NUMBERS, - SPAN_NAME_SCHEDULE, - MAX_MESSAGE_LENGTH_BYTES + MAX_MESSAGE_LENGTH_BYTES, ) from .._common import mgmt_handlers -from .._common.utils import ( - transform_outbound_messages, +from .._common.utils import transform_outbound_messages +from .._common.tracing import ( send_trace_context_manager, trace_message, + is_tracing_enabled, + get_span_links_from_batch, + get_span_link_from_message, + SPAN_NAME_SCHEDULE, + TraceAttributes, ) from ._async_utils import create_authentication @@ -259,19 +263,28 @@ async def schedule_messages( ) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") - with send_trace_context_manager(span_name=SPAN_NAME_SCHEDULE) as send_span: - if isinstance(obj_messages, ServiceBusMessage): - request_body = self._build_schedule_request( - schedule_time_utc, send_span, self._amqp_transport, obj_messages - ) - else: - if len(obj_messages) == 0: - return [] # No-op on empty list. - request_body = self._build_schedule_request( - schedule_time_utc, send_span, self._amqp_transport, *obj_messages - ) - if send_span: - self._add_span_request_attributes(send_span) + + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } + if isinstance(obj_messages, ServiceBusMessage): + request_body, trace_links = self._build_schedule_request( + schedule_time_utc, + self._amqp_transport, + tracing_attributes, + obj_messages + ) + else: + if len(obj_messages) == 0: + return [] # No-op on empty list. + request_body, trace_links = self._build_schedule_request( + schedule_time_utc, + self._amqp_transport, + tracing_attributes, + *obj_messages + ) + with send_trace_context_manager(self, span_name=SPAN_NAME_SCHEDULE, links=trace_links): return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -376,34 +389,44 @@ async def send_messages( pass obj_message: Union[ServiceBusMessage, ServiceBusMessageBatch] - with send_trace_context_manager() as send_span: - if isinstance(message, ServiceBusMessageBatch): - # If AmqpTransports are not the same, create batch with correct BatchMessage. - if self._amqp_transport is not message._amqp_transport: # pylint: disable=protected-access - # pylint: disable=protected-access - batch = await self.create_message_batch() - batch._from_list(message._messages, send_span) # type: ignore - obj_message = batch - else: - obj_message = message + if isinstance(message, ServiceBusMessageBatch): + # If AmqpTransports are not the same, create batch with correct BatchMessage. + if self._amqp_transport is not message._amqp_transport: # pylint: disable=protected-access + # pylint: disable=protected-access + batch = await self.create_message_batch() + batch._from_list(message._messages) # type: ignore + obj_message = batch else: - obj_message = transform_outbound_messages( # type: ignore - message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message + obj_message = message + else: + obj_message = transform_outbound_messages( # type: ignore + message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message + ) + try: + batch = await self.create_message_batch() + batch._from_list(obj_message) # type: ignore # pylint: disable=protected-access + obj_message = batch + except TypeError: # Message was not a list or generator. + # pylint: disable=protected-access + obj_message._message = trace_message( + obj_message._message, + amqp_transport=self._amqp_transport, + additional_attributes={ + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) - try: - batch = await self.create_message_batch() - batch._from_list(obj_message, send_span) # type: ignore # pylint: disable=protected-access - obj_message = batch - except TypeError: # Message was not a list or generator. - # pylint: disable=protected-access - obj_message._message = trace_message( - obj_message._message, - amqp_transport=self._amqp_transport, - parent_span=send_span - ) - - if send_span: - await self._add_span_request_attributes(send_span) + + trace_links = [] + if is_tracing_enabled(): + if isinstance(obj_message, ServiceBusMessageBatch): + trace_links = get_span_links_from_batch(obj_message) + else: + link = get_span_link_from_message(obj_message._message) # pylint: disable=protected-access + if link: + trace_links.append(link) + + with send_trace_context_manager(self, links=trace_links): await self._do_retryable_operation( self._send, message=obj_message, @@ -443,7 +466,12 @@ async def create_message_batch( ) return ServiceBusMessageBatch( - max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), amqp_transport=self._amqp_transport + max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link), + amqp_transport=self._amqp_transport, + tracing_attributes = { + TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, + TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, + } ) @property diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py index f38bb2b9af3e..eec305cb1667 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_pyamqp_transport_async.py @@ -20,12 +20,8 @@ ) from ._base_async import AmqpTransportAsync -from ..._common.utils import ( - utc_from_timestamp, - utc_now, - get_receive_links, - receive_trace_context_manager -) +from ..._common.utils import utc_from_timestamp, utc_now +from ..._common.tracing import get_receive_links, receive_trace_context_manager from ..._common.constants import ( DATETIMEOFFSET_EPOCH, SESSION_LOCKED_UNTIL, diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py index 819eeef32863..b11babed9f85 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_transport/_uamqp_transport_async.py @@ -18,10 +18,7 @@ from ..._transport._uamqp_transport import UamqpTransport from ._base_async import AmqpTransportAsync from .._async_utils import get_running_loop - from ..._common.utils import ( - get_receive_links, - receive_trace_context_manager - ) + from ..._common.tracing import get_receive_links, receive_trace_context_manager from ..._common.constants import ServiceBusReceiveMode if TYPE_CHECKING: diff --git a/sdk/servicebus/azure-servicebus/stress/stress-test-resources.bicep b/sdk/servicebus/azure-servicebus/stress/stress-test-resources.bicep index 0cb2d7a3e335..2bc46e309699 100644 --- a/sdk/servicebus/azure-servicebus/stress/stress-test-resources.bicep +++ b/sdk/servicebus/azure-servicebus/stress/stress-test-resources.bicep @@ -10,6 +10,8 @@ var authorizationRuleName_var = '${baseName}/RootManageSharedAccessKey' var authorizationRuleNameNoManage_var = '${baseName}/NoManage' var serviceBusDataOwnerRoleId = '/subscriptions/${subscription().subscriptionId}/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419' +var sbPremiumName = 'sb-premium-${baseName}' + resource servicebus 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { name: baseName location: location @@ -22,6 +24,16 @@ resource servicebus 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { } } +resource servicebusPremium 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { + name: sbPremiumName + location: location + sku: { + name: 'Premium' + tier: 'Premium' + } +} + + resource authorizationRuleName 'Microsoft.ServiceBus/namespaces/AuthorizationRules@2015-08-01' = { name: authorizationRuleName_var location: location @@ -82,26 +94,27 @@ resource testQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { } } -//resource testQueueWithSessions 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { -// parent: servicebus -// name: 'testQueueWithSessions' -// properties: { -// lockDuration: 'PT5M' -// maxSizeInMegabytes: 1024 -// requiresDuplicateDetection: false -// requiresSession: true -// defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' -// deadLetteringOnMessageExpiration: false -// duplicateDetectionHistoryTimeWindow: 'PT10M' -// maxDeliveryCount: 10 -// autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' -// enablePartitioning: false -// enableExpress: false -// } -//} +resource testQueueWithSessions 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { + parent: servicebus + name: 'testQueueWithSessions' + properties: { + lockDuration: 'PT5M' + maxSizeInMegabytes: 1024 + requiresDuplicateDetection: false + requiresSession: true + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + deadLetteringOnMessageExpiration: false + duplicateDetectionHistoryTimeWindow: 'PT10M' + maxDeliveryCount: 10 + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + enablePartitioning: false + enableExpress: false + } +} -output SERVICE_BUS_CONNECTION_STR string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString -output SERVICE_BUS_QUEUE_NAME string = 'testQueue' -//output QUEUE_NAME_WITH_SESSIONS string = 'testQueueWithSessions' -//output SERVICE_BUS_CONNECTION_STRING_NO_MANAGE string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'NoManage'), apiVersion).primaryConnectionString -//output SERVICE_BUS_ENDPOINT string = replace(servicebus.properties.serviceBusEndpoint, ':443/', '') +output SERVICEBUS_CONNECTION_STRING string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString +output SERVICEBUS_CONNECTION_STRING_NO_MANAGE string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'NoManage'), apiVersion).primaryConnectionString +output SERVICEBUS_CONNECTION_STRING_PREMIUM string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', sbPremiumName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString +output SERVICEBUS_ENDPOINT string = replace(replace(servicebus.properties.serviceBusEndpoint, ':443/', ''), 'https://', '') +output QUEUE_NAME string = 'testQueue' +output QUEUE_NAME_WITH_SESSIONS string = 'testQueueWithSessions' diff --git a/sdk/servicebus/azure-servicebus/stress/test-resources.bicep b/sdk/servicebus/azure-servicebus/stress/test-resources.bicep deleted file mode 100644 index 2bc46e309699..000000000000 --- a/sdk/servicebus/azure-servicebus/stress/test-resources.bicep +++ /dev/null @@ -1,120 +0,0 @@ -@description('The base resource name.') -param baseName string = resourceGroup().name - -@description('The client OID to grant access to test resources.') -param testApplicationOid string - -var apiVersion = '2017-04-01' -var location = resourceGroup().location -var authorizationRuleName_var = '${baseName}/RootManageSharedAccessKey' -var authorizationRuleNameNoManage_var = '${baseName}/NoManage' -var serviceBusDataOwnerRoleId = '/subscriptions/${subscription().subscriptionId}/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419' - -var sbPremiumName = 'sb-premium-${baseName}' - -resource servicebus 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { - name: baseName - location: location - sku: { - name: 'Standard' - tier: 'Standard' - } - properties: { - zoneRedundant: false - } -} - -resource servicebusPremium 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { - name: sbPremiumName - location: location - sku: { - name: 'Premium' - tier: 'Premium' - } -} - - -resource authorizationRuleName 'Microsoft.ServiceBus/namespaces/AuthorizationRules@2015-08-01' = { - name: authorizationRuleName_var - location: location - properties: { - rights: [ - 'Listen' - 'Manage' - 'Send' - ] - } - dependsOn: [ - servicebus - ] -} - -resource authorizationRuleNameNoManage 'Microsoft.ServiceBus/namespaces/AuthorizationRules@2015-08-01' = { - name: authorizationRuleNameNoManage_var - location: location - properties: { - rights: [ - 'Listen' - 'Send' - ] - } - dependsOn: [ - servicebus - ] -} - - - -resource dataOwnerRoleId 'Microsoft.Authorization/roleAssignments@2018-01-01-preview' = { - name: guid('dataOwnerRoleId${baseName}') - properties: { - roleDefinitionId: serviceBusDataOwnerRoleId - principalId: testApplicationOid - } - dependsOn: [ - servicebus - ] -} - -resource testQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { - parent: servicebus - name: 'testQueue' - properties: { - lockDuration: 'PT5M' - maxSizeInMegabytes: 1024 - requiresDuplicateDetection: false - requiresSession: false - defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' - deadLetteringOnMessageExpiration: false - duplicateDetectionHistoryTimeWindow: 'PT10M' - maxDeliveryCount: 10 - autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' - enablePartitioning: false - enableExpress: false - } -} - -resource testQueueWithSessions 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { - parent: servicebus - name: 'testQueueWithSessions' - properties: { - lockDuration: 'PT5M' - maxSizeInMegabytes: 1024 - requiresDuplicateDetection: false - requiresSession: true - defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' - deadLetteringOnMessageExpiration: false - duplicateDetectionHistoryTimeWindow: 'PT10M' - maxDeliveryCount: 10 - autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' - enablePartitioning: false - enableExpress: false - } -} - -output SERVICEBUS_CONNECTION_STRING string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString -output SERVICEBUS_CONNECTION_STRING_NO_MANAGE string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'NoManage'), apiVersion).primaryConnectionString -output SERVICEBUS_CONNECTION_STRING_PREMIUM string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', sbPremiumName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString -output SERVICEBUS_ENDPOINT string = replace(replace(servicebus.properties.serviceBusEndpoint, ':443/', ''), 'https://', '') -output QUEUE_NAME string = 'testQueue' -output QUEUE_NAME_WITH_SESSIONS string = 'testQueueWithSessions'