Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] uamqp/pyamqp switch #28512

Merged
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
111fb5b
add transports to sender
swathipil Jan 27, 2023
e5592ba
fix pyamqp send
swathipil Jan 30, 2023
2379018
update default amqps port
swathipil Jan 30, 2023
83f3d57
update send to handle diff transports for sender and batch
swathipil Jan 30, 2023
972b673
update to handle sending amqp annotated message
swathipil Jan 30, 2023
ade4be8
fix scheduled sending
swathipil Jan 31, 2023
67ae4dc
add switch for mgmt requests
swathipil Jan 31, 2023
6d1107d
cleanup of pyamqp imports
swathipil Jan 31, 2023
390f197
add switch to receiver
swathipil Feb 2, 2023
7d210d5
add switch to receiver mixin/mgmt handlers
swathipil Feb 2, 2023
1f91d25
fix build received message
swathipil Feb 3, 2023
3a6347f
bugs
swathipil Feb 3, 2023
51a1ec7
update tests
swathipil Feb 7, 2023
ea24521
fix typing/lint
swathipil Feb 7, 2023
b25aaf4
update async receiver
swathipil Feb 8, 2023
28c4c3c
update async sender
swathipil Feb 8, 2023
3ebe41f
fix async deferred receive
swathipil Feb 8, 2023
95efc93
parametrize tests
swathipil Feb 14, 2023
2c50e56
fix mgmt client to take cred with str token
swathipil Feb 14, 2023
7c6486f
parametrize async tests
swathipil Feb 14, 2023
0bbc62a
update test errors file
swathipil Feb 14, 2023
c9e61c7
update queue tests
swathipil Feb 14, 2023
0dc407e
update async test argpasser
swathipil Feb 15, 2023
2906b7c
Merge branch 'feature/servicebus/pyproto' into swathipil/sb/uamqp-switch
swathipil Feb 15, 2023
f5e2a31
fix failing async tests
swathipil Feb 16, 2023
97be492
fix outgoing pyamqp message annotations
swathipil Feb 16, 2023
f5594cf
fix renew lock without auto complete uamqp
swathipil Feb 16, 2023
a1b25b5
unskip message backcompat tests
swathipil Feb 24, 2023
2941ad7
update backcompat tests
swathipil Feb 28, 2023
f3fce14
fix failing message backcompat tests
swathipil Mar 3, 2023
a76078c
fix encoded size tests
swathipil Mar 3, 2023
13f1c26
fix merge conflicts
swathipil Mar 17, 2023
71439f4
merge uamqp/pyamqp receive iterators
swathipil Mar 17, 2023
ba83d0c
rename uamqp_transport_func
swathipil Mar 17, 2023
2222d05
update iterator tests for uamqp+pyamqp
swathipil Mar 18, 2023
958eeb6
lint
swathipil Mar 20, 2023
2e7aca9
fix failing tests
swathipil Mar 20, 2023
bdc0ba8
unskip async non live tests
swathipil Mar 21, 2023
4ee4feb
fix failing release messages
swathipil Mar 21, 2023
310f76c
fix deadletter entity name bug
swathipil Mar 24, 2023
ca48c18
fix more tests
swathipil Mar 24, 2023
8f9a4dd
address comments
swathipil Mar 27, 2023
dc2b041
address more comments
swathipil Mar 27, 2023
d05886f
more fixes
swathipil Mar 27, 2023
6e6fa33
typing
swathipil Mar 28, 2023
7e7e203
fix transport classes/docstring/typing
swathipil Mar 28, 2023
45ce2ed
update docstring
swathipil Mar 28, 2023
70982a9
resolve merge conflicts
swathipil Mar 29, 2023
7ce5202
more conflicts
swathipil Mar 29, 2023
a4e9024
fix tracing, to update app props with diagnostic id
swathipil Mar 29, 2023
1ee84d1
fix mypy
swathipil Mar 31, 2023
92a86ed
lint
swathipil Mar 31, 2023
1741613
more lint
swathipil Mar 31, 2023
2af5adc
fix exception bug
swathipil Mar 31, 2023
e4a9b95
kashif/libba comments
swathipil Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@

### Features Added

- A new boolean keyword argument `uamqp_transport` has been added to sync and async `ServiceBusClient` constructors which indicates whether to use the `uamqp` library or the default pure Python AMQP library as the underlying transport.
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved

### Bugs Fixed

- Fixed a bug where sync and async `ServiceBusAdministrationClient` expected `credential` with `get_token` method returning `AccessToken.token` of type `bytes` and not `str`, now matching the documentation.
- Fixed a bug where `raw_amqp_message.header` and `message.header` properties on `ServiceReceivedBusMessage` were returned with `durable`, `first_acquirer`, and `priority` properties set by default, rather than the values returned by the service.

### Other Changes

- The `message` attribute on `ServiceBus`/`ServiceBusMessageBatch`/`ServiceBusReceivedMessage`, which previously exposed the `uamqp.Message`/`uamqp.BatchMessage`, has been deprecated.
- `LegacyMessage`/`LegacyBatchMessage` objects returned by the `message` attribute on `ServiceBus`/`ServiceBusMessageBatch` have been introduced to help facilitate the transition.
- Removed uAMQP from required dependencies.
- Adding `uamqp >= 1.6.3` as an optional dependency for use with the `uamqp_transport` keyword.

## 7.9.0 (Unreleased)

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from ._pyamqp import constants

from ._version import VERSION

__version__ = VERSION
Expand All @@ -23,6 +21,7 @@
ServiceBusSubQueue,
ServiceBusMessageState,
ServiceBusSessionFilter,
TransportType,
NEXT_AVAILABLE_SESSION,
)
from ._common.auto_lock_renewer import AutoLockRenewer
Expand All @@ -31,8 +30,6 @@
ServiceBusConnectionStringProperties,
)

TransportType = constants.TransportType

__all__ = [
"ServiceBusMessage",
"ServiceBusMessageBatch",
Expand Down
174 changes: 98 additions & 76 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
# --------------------------------------------------------------------------------------------
# # --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Union, TYPE_CHECKING
from urllib.parse import urlparse

from azure.core.pipeline.policies import RetryMode
from .._pyamqp.constants import TransportType


DEFAULT_AMQPS_PORT = 1571
DEFAULT_AMQP_WSS_PORT = 443

from .constants import DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT, TransportType
if TYPE_CHECKING:
from .._transport._base import AmqpTransport
from ..aio._transport._base_async import AmqpTransportAsync

class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.user_agent: Optional[str] = kwargs.get("user_agent")
self.retry_total: int = kwargs.get("retry_total", 3)
self.retry_mode = RetryMode(kwargs.get("retry_mode", 'exponential'))
self.retry_backoff_factor = kwargs.get(
self.retry_backoff_factor: float = kwargs.get(
"retry_backoff_factor", 0.8
) # type: float
self.retry_backoff_max = kwargs.get("retry_backoff_max", 120) # type: int
self.logging_enable = kwargs.get("logging_enable", False) # type: bool
self.http_proxy = kwargs.get("http_proxy") # type: Optional[Dict[str, Any]]
)
self.retry_backoff_max: int = kwargs.get("retry_backoff_max", 120)
self.logging_enable: bool = kwargs.get("logging_enable", False)
self.http_proxy: Optional[Dict[str, Any]] = kwargs.get("http_proxy")

self.custom_endpoint_address = kwargs.get("custom_endpoint_address") # type: Optional[str]
self.connection_verify = kwargs.get("connection_verify") # type: Optional[str]
self.custom_endpoint_address: Optional[str] = kwargs.get("custom_endpoint_address")
self.connection_verify: Optional[str] = kwargs.get("connection_verify")
self.connection_port = DEFAULT_AMQPS_PORT
self.custom_endpoint_hostname = None
self.hostname = kwargs.pop("hostname")
amqp_transport: Union["AmqpTransport", "AmqpTransportAsync"] = kwargs.pop("amqp_transport")

self.transport_type = (
TransportType.AmqpOverWebsocket
if self.http_proxy
else kwargs.get("transport_type", TransportType.Amqp)
)
# The following configs are not public, for internal usage only
self.auth_timeout = kwargs.get("auth_timeout", 60) # type: int
self.auth_timeout: float = kwargs.get("auth_timeout", 60)
self.encoding = kwargs.get("encoding", "UTF-8")
self.auto_reconnect = kwargs.get("auto_reconnect", True)
self.keep_alive = kwargs.get("keep_alive", 30)
self.timeout = kwargs.get("timeout", 60) # type: float
self.timeout: float = kwargs.get("timeout", 60)

if self.http_proxy or self.transport_type == TransportType.AmqpOverWebsocket:
self.transport_type = TransportType.AmqpOverWebsocket
self.connection_port = DEFAULT_AMQP_WSS_PORT
if amqp_transport.KIND == "pyamqp":
self.hostname += "/$servicebus/websocket"

# custom end point
if self.custom_endpoint_address:
Expand All @@ -55,5 +57,7 @@ def __init__(self, **kwargs):
endpoint = urlparse(self.custom_endpoint_address)
self.transport_type = TransportType.AmqpOverWebsocket
self.custom_endpoint_hostname = endpoint.hostname
if amqp_transport.KIND == "pyamqp":
self.custom_endpoint_address += "/$servicebus/websocket"
# in case proxy and custom endpoint are both provided, we default port to 443 if it's not provided
self.connection_port = endpoint.port or DEFAULT_AMQP_WSS_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# -------------------------------------------------------------------------
from enum import Enum
from azure.core import CaseInsensitiveEnumMeta
from .._pyamqp import constants

VENDOR = b"com.microsoft"
DATETIMEOFFSET_EPOCH = 621355968000000000
Expand Down Expand Up @@ -60,6 +59,8 @@
JWT_TOKEN_SCOPE = "https://servicebus.azure.net//.default"
USER_AGENT_PREFIX = "azsdk-python-servicebus"
CONSUMER_IDENTIFIER = VENDOR + b":receiver-name"
UAMQP_LIBRARY = "uamqp"
PYAMQP_LIBRARY = "pyamqp"

MANAGEMENT_PATH_SUFFIX = "/$management"

Expand Down Expand Up @@ -178,12 +179,6 @@ class ServiceBusMessageState(int, Enum):
DEFERRED = 1
SCHEDULED = 2

# To enable extensible string enums for the public facing parameter, and translate to the "real" uamqp constants.
ServiceBusToAMQPReceiveModeMap = {
ServiceBusReceiveMode.PEEK_LOCK: constants.ReceiverSettleMode.Second,
ServiceBusReceiveMode.RECEIVE_AND_DELETE: constants.ReceiverSettleMode.First,
}


class ServiceBusSessionFilter(Enum):
NEXT_AVAILABLE = 0
Expand All @@ -195,3 +190,17 @@ class ServiceBusSubQueue(str, Enum, metaclass=CaseInsensitiveEnumMeta):


NEXT_AVAILABLE_SESSION = ServiceBusSessionFilter.NEXT_AVAILABLE

## all below - previously uamqp
class TransportType(Enum):
"""Transport type
The underlying transport protocol type:
Amqp: AMQP over the default TCP transport protocol, it uses port 5671.
AmqpOverWebsocket: Amqp over the Web Sockets transport protocol, it uses
port 443.
"""
Amqp = 1
AmqpOverWebsocket = 2

DEFAULT_AMQPS_PORT = 5671
DEFAULT_AMQP_WSS_PORT = 443
Loading