Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] uamqp/pyamqp switch #28512

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
111fb5b
add transports to sender
swathipil Jan 27, 2023
e5592ba
fix pyamqp send
swathipil Jan 30, 2023
2379018
update default amqps port
swathipil Jan 30, 2023
83f3d57
update send to handle diff transports for sender and batch
swathipil Jan 30, 2023
972b673
update to handle sending amqp annotated message
swathipil Jan 30, 2023
ade4be8
fix scheduled sending
swathipil Jan 31, 2023
67ae4dc
add switch for mgmt requests
swathipil Jan 31, 2023
6d1107d
cleanup of pyamqp imports
swathipil Jan 31, 2023
390f197
add switch to receiver
swathipil Feb 2, 2023
7d210d5
add switch to receiver mixin/mgmt handlers
swathipil Feb 2, 2023
1f91d25
fix build received message
swathipil Feb 3, 2023
3a6347f
bugs
swathipil Feb 3, 2023
51a1ec7
update tests
swathipil Feb 7, 2023
ea24521
fix typing/lint
swathipil Feb 7, 2023
b25aaf4
update async receiver
swathipil Feb 8, 2023
28c4c3c
update async sender
swathipil Feb 8, 2023
3ebe41f
fix async deferred receive
swathipil Feb 8, 2023
95efc93
parametrize tests
swathipil Feb 14, 2023
2c50e56
fix mgmt client to take cred with str token
swathipil Feb 14, 2023
7c6486f
parametrize async tests
swathipil Feb 14, 2023
0bbc62a
update test errors file
swathipil Feb 14, 2023
c9e61c7
update queue tests
swathipil Feb 14, 2023
0dc407e
update async test argpasser
swathipil Feb 15, 2023
2906b7c
Merge branch 'feature/servicebus/pyproto' into swathipil/sb/uamqp-switch
swathipil Feb 15, 2023
f5e2a31
fix failing async tests
swathipil Feb 16, 2023
97be492
fix outgoing pyamqp message annotations
swathipil Feb 16, 2023
f5594cf
fix renew lock without auto complete uamqp
swathipil Feb 16, 2023
a1b25b5
unskip message backcompat tests
swathipil Feb 24, 2023
2941ad7
update backcompat tests
swathipil Feb 28, 2023
f3fce14
fix failing message backcompat tests
swathipil Mar 3, 2023
a76078c
fix encoded size tests
swathipil Mar 3, 2023
13f1c26
fix merge conflicts
swathipil Mar 17, 2023
71439f4
merge uamqp/pyamqp receive iterators
swathipil Mar 17, 2023
ba83d0c
rename uamqp_transport_func
swathipil Mar 17, 2023
2222d05
update iterator tests for uamqp+pyamqp
swathipil Mar 18, 2023
958eeb6
lint
swathipil Mar 20, 2023
2e7aca9
fix failing tests
swathipil Mar 20, 2023
bdc0ba8
unskip async non live tests
swathipil Mar 21, 2023
4ee4feb
fix failing release messages
swathipil Mar 21, 2023
310f76c
fix deadletter entity name bug
swathipil Mar 24, 2023
ca48c18
fix more tests
swathipil Mar 24, 2023
8f9a4dd
address comments
swathipil Mar 27, 2023
dc2b041
address more comments
swathipil Mar 27, 2023
d05886f
more fixes
swathipil Mar 27, 2023
6e6fa33
typing
swathipil Mar 28, 2023
7e7e203
fix transport classes/docstring/typing
swathipil Mar 28, 2023
45ce2ed
update docstring
swathipil Mar 28, 2023
70982a9
resolve merge conflicts
swathipil Mar 29, 2023
7ce5202
more conflicts
swathipil Mar 29, 2023
a4e9024
fix tracing, to update app props with diagnostic id
swathipil Mar 29, 2023
1ee84d1
fix mypy
swathipil Mar 31, 2023
92a86ed
lint
swathipil Mar 31, 2023
1741613
more lint
swathipil Mar 31, 2023
2af5adc
fix exception bug
swathipil Mar 31, 2023
e4a9b95
kashif/libba comments
swathipil Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -947,8 +947,17 @@ def __repr__(self) -> str: # pylint: disable=too-many-branches,too-many-stateme
message_repr += ", locked_until_utc=<read-error>"
return "ServiceBusReceivedMessage({})".format(message_repr)[:1024]

@property
@property # type: ignore[misc] # TODO: ignoring error to copy over setter, since it's inherited
def message(self) -> LegacyMessage:
"""DEPRECATED: Get the underlying LegacyMessage.
This is deprecated and will be removed in a later release.

:rtype: LegacyMessage
"""
warnings.warn(
"The `message` property is deprecated and will be removed in future versions.",
DeprecationWarning,
)
if not self._uamqp_message:
if not self._settled:
settler = self._receiver._handler # pylint:disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# -------------------------------------------------------------------------
import uuid
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Union

from ..exceptions import MessageAlreadySettled
from .constants import (
Expand All @@ -15,10 +15,11 @@

if TYPE_CHECKING:
from .._transport._base import AmqpTransport
from ..aio._transport._base_async import AmqpTransportAsync

class ReceiverMixin(object): # pylint: disable=too-many-instance-attributes
def _populate_attributes(self, **kwargs):
self._amqp_transport: "AmqpTransport"
self._amqp_transport: Union["AmqpTransport", "AmqpTransportAsync"]
if kwargs.get("subscription_name"):
self._subscription_name = kwargs.get("subscription_name")
self._is_subscription = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from .receiver_mixins import ReceiverMixin
from .._servicebus_session import BaseSession
from .._transport._base import AmqpTransport
from ..aio._transport._base_async import AmqpTransportAsync

MessagesType = Union[
Mapping[str, Any],
Expand Down Expand Up @@ -304,7 +305,7 @@ def receive_trace_context_manager(

def trace_message(
message: Union["uamqp_Message", "pyamqp_Message"],
amqp_transport: "AmqpTransport",
amqp_transport: Union["AmqpTransport", "AmqpTransportAsync"],
parent_span: Optional["AbstractSpan"] = None
) -> Union["uamqp_Message", "pyamqp_Message"]:
"""Add tracing information to this message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ class ApacheFilters(object):
'default_outcome',
'outcomes',
'capabilities'
])
Source.__new__.__defaults__ = (None,) * len(Source._fields) # type: ignore
],
defaults=(None,) * 11 # type: ignore
)
Source._code = 0x00000028 # type: ignore # pylint: disable=protected-access
Source._definition = ( # type: ignore # pylint: disable=protected-access
FIELD("address", AMQPTypes.string, False, None, False),
Expand Down Expand Up @@ -227,9 +228,10 @@ class ApacheFilters(object):
'dynamic',
'dynamic_node_properties',
'capabilities'
])
],
defaults=(None,) * 7 # type: ignore
)
Target._code = 0x00000029 # type: ignore # pylint: disable=protected-access
Target.__new__.__defaults__ = (None,) * len(Target._fields) # type: ignore # type: ignore # pylint: disable=protected-access
Target._definition = ( # type: ignore # pylint: disable=protected-access
FIELD("address", AMQPTypes.string, False, None, False),
FIELD("durable", AMQPTypes.uint, False, "none", False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
'ttl',
'first_acquirer',
'delivery_count'
])
],
defaults=(None,) * 5 # type: ignore
)
Header._code = 0x00000070 # type: ignore # pylint:disable=protected-access
Header.__new__.__defaults__ = (None,) * len(Header._fields) # type: ignore
Header._definition = ( # type: ignore # pylint:disable=protected-access
FIELD("durable", AMQPTypes.boolean, False, None, False),
FIELD("priority", AMQPTypes.ubyte, False, None, False),
Expand Down Expand Up @@ -91,7 +92,9 @@
'group_id',
'group_sequence',
'reply_to_group_id'
])
],
defaults=(None,) * 13 # type: ignore
)
Properties._code = 0x00000073 # type: ignore # pylint:disable=protected-access
Properties.__new__.__defaults__ = (None,) * len(Properties._fields) # type: ignore
Properties._definition = ( # type: ignore # pylint:disable=protected-access
Expand Down Expand Up @@ -177,8 +180,9 @@
'sequence',
'value',
'footer',
])
Message.__new__.__defaults__ = (None,) * len(Message._fields) # type: ignore
],
defaults=(None,) * 9 # type: ignore
)
Message._code = 0 # type: ignore # pylint:disable=protected-access
Message._definition = ( # type: ignore # pylint:disable=protected-access
(0x00000070, FIELD("header", Header, False, None, False)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(
**kwargs: Any
) -> None:
uamqp_transport = kwargs.pop("uamqp_transport", False)
if uamqp_transport and not UamqpTransport:
if uamqp_transport and UamqpTransport is None:
raise ValueError("To use the uAMQP transport, please install `uamqp>=1.6.3,<2.0.0`.")
self._amqp_transport = UamqpTransport if uamqp_transport else PyamqpTransport

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def _create_handler(self, auth: Union["pyamqp_JWTTokenAuth", "uamqp_JWTTokenAuth
if self._prefetch_count == 1:
# pylint: disable=protected-access
self._handler._message_received = functools.partial(
self._amqp_transport.enhanced_message_received,
self._amqp_transport.enhanced_message_received, # type: ignore[attr-defined]
self
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,12 @@ def send_messages(
raise ValueError("The timeout must be greater than 0.")

try: # Short circuit noop if an empty list or batch is provided.
if len(message) == 0: # pylint: disable=len-as-condition
if len(cast(Union[List, ServiceBusMessageBatch], message)) == 0: # pylint: disable=len-as-condition
return
except TypeError: # continue if ServiceBusMessage
pass

obj_message: Union[ServiceBusMessage, ServiceBusMessageBatch]
with send_trace_context_manager() as send_span:
if isinstance(message, ServiceBusMessageBatch):
# If AmqpTransports are not the same, create batch with correct BatchMessage.
Expand All @@ -423,7 +424,7 @@ def send_messages(
batch._from_list(message._messages, send_span) # type: ignore
obj_message = batch
else:
obj_message: "MessageObjTypes" = message
obj_message = message
else:
obj_message = transform_outbound_messages( # type: ignore
message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message
Expand All @@ -439,7 +440,6 @@ def send_messages(
parent_span=send_span
)

obj_message = cast(Union[ServiceBusMessage, ServiceBusMessageBatch], obj_message)
if send_span:
self._add_span_request_attributes(send_span)
self._do_retryable_operation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
from typing import Union, TYPE_CHECKING, Dict, Optional, List, Any, Callable
from typing import Union, TYPE_CHECKING, Dict, Any, Callable
from abc import ABC, abstractmethod

if TYPE_CHECKING:
Expand All @@ -22,18 +22,20 @@ class AmqpTransport(ABC): # pylint: disable=too-many-public-methods
MAX_FRAME_SIZE_BYTES: int
MAX_MESSAGE_LENGTH_BYTES: int
TIMEOUT_FACTOR: int
#CONNECTION_CLOSING_STATES: Tuple
TRANSPORT_IDENTIFIER: str

ServiceBusToAMQPReceiveModeMap: Dict[str, Any]

# define symbols
PRODUCT_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
VERSION_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
FRAMEWORK_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
PLATFORM_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
USER_AGENT_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
PROP_PARTITION_KEY_AMQP_SYMBOL: Union[uamqp_types.AMQPSymbol, str, bytes]
AMQP_LONG_VALUE: Union[uamqp_types.AMQPLong, Dict[str, str]]
AMQP_ARRAY_VALUE: Union[uamqp_types.AMQPArray, Dict[str, str]]
AMQP_LONG_VALUE: Callable
AMQP_ARRAY_VALUE: Callable
AMQP_UINT_VALUE: Callable

@staticmethod
@abstractmethod
Expand All @@ -45,7 +47,7 @@ def build_message(**kwargs):

@staticmethod
@abstractmethod
def build_batch_message(**kwargs):
def build_batch_message(data):
"""
Creates a uamqp.BatchMessage or pyamqp.BatchMessage with given arguments.
:rtype: uamqp.BatchMessage or pyamqp.BatchMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import datetime
from datetime import timezone
from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict
from typing import Optional, Tuple, cast, List, TYPE_CHECKING, Any, Callable, Dict, Union, Iterator, Type

from .._pyamqp import (
utils,
Expand Down Expand Up @@ -93,7 +93,7 @@

if TYPE_CHECKING:
from logging import Logger
from ..amqp import AmqpAnnotatedMessage
from ..amqp import AmqpAnnotatedMessage, AmqpMessageHeader, AmqpMessageProperties
from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_sender import ServiceBusSender
from .._common.message import ServiceBusReceivedMessage, ServiceBusMessage, ServiceBusMessageBatch
Expand Down Expand Up @@ -183,9 +183,9 @@ class PyamqpTransport(AmqpTransport): # pylint: disable=too-many-public-method
#ERROR_CONDITIONS = [condition.value for condition in ErrorCondition]

# amqp value types
AMQP_LONG_VALUE = amqp_long_value
AMQP_ARRAY_VALUE = amqp_array_value
AMQP_UINT_VALUE = amqp_uint_value
AMQP_LONG_VALUE: Callable = amqp_long_value
AMQP_ARRAY_VALUE: Callable = amqp_array_value
AMQP_UINT_VALUE: Callable = amqp_uint_value

# errors
TIMEOUT_ERROR = TimeoutError
Expand Down Expand Up @@ -243,7 +243,8 @@ def to_outgoing_amqp_message(annotated_message: "AmqpAnnotatedMessage") -> "Mess
ttl_set = False
header_vals = annotated_message.header.values() if annotated_message.header else None
# If header and non-None header values, create outgoing header.
if annotated_message.header and header_vals.count(None) != len(header_vals):
if header_vals and header_vals.count(None) != len(header_vals):
annotated_message.header = cast("AmqpMessageHeader", annotated_message.header)
message_header = Header(
delivery_count=annotated_message.header.delivery_count,
ttl=annotated_message.header.time_to_live,
Expand All @@ -264,7 +265,8 @@ def to_outgoing_amqp_message(annotated_message: "AmqpAnnotatedMessage") -> "Mess
message_properties = None
properties_vals = annotated_message.properties.values() if annotated_message.properties else None
# If properties and non-None properties values, create outgoing properties.
if annotated_message.properties and properties_vals.count(None) != len(properties_vals):
if properties_vals and properties_vals.count(None) != len(properties_vals):
annotated_message.properties = cast("AmqpMessageProperties", annotated_message.properties)
creation_time = None
absolute_expiry_time = None
if ttl_set:
Expand Down Expand Up @@ -292,7 +294,7 @@ def to_outgoing_amqp_message(annotated_message: "AmqpAnnotatedMessage") -> "Mess
reply_to_group_id=annotated_message.properties.reply_to_group_id,
)
elif ttl_set:
message_properties = Properties(
message_properties = Properties( # type: ignore[call-arg]
creation_time=creation_time_from_ttl if ttl_set else None,
absolute_expiry_time=absolute_expiry_time_from_ttl if ttl_set else None,
)
Expand Down Expand Up @@ -457,7 +459,7 @@ def create_send_client(
@staticmethod
def send_messages(
sender: "ServiceBusSender",
message: "Message",
message: Union["ServiceBusMessage", "ServiceBusMessageBatch"],
logger: "Logger",
timeout: int,
last_exception: Optional[Exception]
Expand Down Expand Up @@ -507,7 +509,7 @@ def create_source(source: "Source", session_filter: Optional[str]) -> "Source":
:param bytes session_filter: Required.
"""
filter_map = {SESSION_FILTER: session_filter}
source = Source(address=source, filters=filter_map)
source = Source(address=source, filters=filter_map) # type: ignore[call-arg]
return source

@staticmethod
Expand Down Expand Up @@ -584,7 +586,7 @@ def on_attach(
@staticmethod
def iter_contextual_wrapper(
receiver: "ServiceBusReceiver", max_wait_time: Optional[int] = None
) -> "ServiceBusReceivedMessage":
) -> Iterator["ServiceBusReceivedMessage"]:
"""The purpose of this wrapper is to allow both state restoration (for multiple concurrent iteration)
and per-iter argument passing that requires the former."""
while True:
Expand All @@ -609,7 +611,9 @@ def iter_next(
receiver._open()
if not receiver._message_iter or wait_time:
receiver._message_iter = receiver._handler.receive_messages_iter(timeout=wait_time)
pyamqp_message = next(receiver._message_iter)
pyamqp_message = next(
cast(Iterator["Message"], receiver._message_iter)
)
message = receiver._build_received_message(pyamqp_message)
if (
receiver._auto_lock_renewer
Expand Down Expand Up @@ -640,7 +644,7 @@ def enhanced_message_received(
@staticmethod
def build_received_message(
receiver: "ServiceBusReceiver",
message_type: "ServiceBusReceivedMessage",
message_type: Type["ServiceBusReceivedMessage"],
received: "Message"
) -> "ServiceBusReceivedMessage":
"""
Expand All @@ -656,7 +660,7 @@ def build_received_message(
@staticmethod
def get_current_time(
handler: "ReceiveClient"
) -> int: # pylint: disable=unused-argument
) -> float: # pylint: disable=unused-argument
"""
Gets the current time.
"""
Expand Down Expand Up @@ -717,7 +721,9 @@ def settle_message_via_receiver_link(

@staticmethod
def parse_received_message(
message: "Message", message_type: "ServiceBusReceivedMessage", **kwargs: Any
message: "Message",
message_type: Type["ServiceBusReceivedMessage"],
**kwargs: Any
) -> List["ServiceBusReceivedMessage"]:
"""
Parses peek/deferred op messages into ServiceBusReceivedMessage.
Expand Down Expand Up @@ -836,10 +842,10 @@ def mgmt_client_request(
@staticmethod
def _handle_amqp_exception_with_condition(
logger: "Logger",
condition: "ErrorCondition",
condition: Optional["ErrorCondition"],
description: str,
exception: "AMQPException" = None,
status_code: str = None
exception: Optional["AMQPException"] = None,
status_code: Optional[str] = None
) -> "ServiceBusError":
# handling AMQP Errors that have the condition field or the mgmt handler
logger.info(
Expand All @@ -848,6 +854,7 @@ def _handle_amqp_exception_with_condition(
condition,
description,
)
error_cls: Type["ServiceBusError"]
if isinstance(exception, AuthenticationException):
logger.info("AMQP Connection authentication error occurred: (%r).", exception)
error_cls = ServiceBusAuthenticationError
Expand All @@ -870,7 +877,7 @@ def _handle_amqp_exception_with_condition(
elif condition == ErrorCondition.UnknownError or isinstance(exception, AMQPConnectionError):
error_cls = ServiceBusConnectionError
else:
error_cls = _ERROR_CODE_TO_ERROR_MAPPING.get(condition, ServiceBusError)
error_cls = _ERROR_CODE_TO_ERROR_MAPPING.get(cast(bytes, condition), ServiceBusError)

error = error_cls(
message=description,
Expand All @@ -889,9 +896,9 @@ def _handle_amqp_exception_with_condition(
def handle_amqp_mgmt_error(
logger: "Logger",
error_description: "str",
condition: "ErrorCondition" = None,
description: str = None,
status_code: str = None
condition: Optional["ErrorCondition"] = None,
description: Optional[str] = None,
status_code: Optional[str] = None
) -> "ServiceBusError":
if description:
error_description += f" {description}."
Expand Down
Loading