Skip to content

Commit

Permalink
[ServiceBus] expand kwargs in public API (#22353)
Browse files Browse the repository at this point in the history
* inital expand kwargs w/o mgmt models

* fix mypy/pylint

* mypy

* adam + anna comments

* lint

* lint bad whitespace

* remove retry

* adams comments

* lint
  • Loading branch information
swathipil authored Jan 8, 2022
1 parent 8668461 commit 60a7f62
Show file tree
Hide file tree
Showing 13 changed files with 761 additions and 334 deletions.
48 changes: 31 additions & 17 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import datetime
import uuid
import logging
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast
from typing import Optional, Dict, List, Union, Iterable, TYPE_CHECKING, Any, Mapping, cast

import six

Expand Down Expand Up @@ -92,8 +92,24 @@ class ServiceBusMessage(
"""

def __init__(self, body, **kwargs):
# type: (Optional[Union[str, bytes]], Any) -> None
def __init__(
self,
body: Optional[Union[str, bytes]],
*,
application_properties: Optional[Dict[str, Any]] = None,
session_id: Optional[str] = None,
message_id: Optional[str] = None,
scheduled_enqueue_time_utc: Optional[datetime.datetime] = None,
time_to_live: Optional[datetime.timedelta] = None,
content_type: Optional[str] = None,
correlation_id: Optional[str] = None,
subject: Optional[str] = None,
partition_key: Optional[str] = None,
to: Optional[str] = None,
reply_to: Optional[str] = None,
reply_to_session_id: Optional[str] = None,
**kwargs: Any
) -> None:
# Although we might normally thread through **kwargs this causes
# problems as MessageProperties won't absorb spurious args.
self._encoding = kwargs.pop("encoding", "UTF-8")
Expand All @@ -108,20 +124,18 @@ def __init__(self, body, **kwargs):
self._raw_amqp_message = AmqpAnnotatedMessage(message=self.message)
else:
self._build_message(body)
self.application_properties = kwargs.pop("application_properties", None)
self.session_id = kwargs.pop("session_id", None)
self.message_id = kwargs.pop("message_id", None)
self.content_type = kwargs.pop("content_type", None)
self.correlation_id = kwargs.pop("correlation_id", None)
self.to = kwargs.pop("to", None)
self.reply_to = kwargs.pop("reply_to", None)
self.reply_to_session_id = kwargs.pop("reply_to_session_id", None)
self.subject = kwargs.pop("subject", None)
self.scheduled_enqueue_time_utc = kwargs.pop(
"scheduled_enqueue_time_utc", None
)
self.time_to_live = kwargs.pop("time_to_live", None)
self.partition_key = kwargs.pop("partition_key", None)
self.application_properties = application_properties
self.session_id = session_id
self.message_id = message_id
self.content_type = content_type
self.correlation_id = correlation_id
self.to = to
self.reply_to = reply_to
self.reply_to_session_id = reply_to_session_id
self.subject = subject
self.scheduled_enqueue_time_utc = scheduled_enqueue_time_utc
self.time_to_live = time_to_live
self.partition_key = partition_key

def __str__(self):
# type: () -> str
Expand Down
111 changes: 95 additions & 16 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Union, TYPE_CHECKING
from typing import Any, Union, Optional, TYPE_CHECKING
import logging
from weakref import WeakSet

Expand All @@ -15,16 +15,25 @@
)
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._common.auto_lock_renewer import AutoLockRenewer
from ._common._configuration import Configuration
from ._common.utils import (
create_authentication,
generate_dead_letter_entity_name,
strip_protocol_from_uri,
)
from ._common.constants import ServiceBusSubQueue
from ._common.constants import (
ServiceBusSubQueue,
ServiceBusReceiveMode,
)

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential, AzureSasCredential, AzureNamedKeyCredential
from azure.core.credentials import (
TokenCredential,
AzureSasCredential,
AzureNamedKeyCredential,
)


_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,15 +84,32 @@ class ServiceBusClient(object):
"""

def __init__(self, fully_qualified_namespace, credential, **kwargs):
# type: (str, Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential], Any) -> None
def __init__(
self,
fully_qualified_namespace: str,
credential: Union[
"TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"
],
*,
retry_total: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
retry_mode: str = "exponential",
**kwargs: Any
) -> None:
# If the user provided http:// or sb://, let's be polite and strip that.
self.fully_qualified_namespace = strip_protocol_from_uri(
fully_qualified_namespace.strip()
)

self._credential = credential
self._config = Configuration(**kwargs)
self._config = Configuration(
retry_total=retry_total,
retry_backoff_factor=retry_backoff_factor,
retry_backoff_max=retry_backoff_max,
retry_mode=retry_mode,
**kwargs
)
self._connection = None
# Optional entity name, can be the name of Queue or Topic. Intentionally not advertised, typically be needed.
self._entity_name = kwargs.get("entity_name")
Expand Down Expand Up @@ -134,8 +160,16 @@ def close(self):
self._connection.destroy()

@classmethod
def from_connection_string(cls, conn_str, **kwargs):
# type: (str, Any) -> ServiceBusClient
def from_connection_string(
cls,
conn_str: str,
*,
retry_total: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: int = 120,
retry_mode: str = "exponential",
**kwargs: Any
) -> "ServiceBusClient":
"""
Create a ServiceBusClient from a connection string.
Expand Down Expand Up @@ -181,6 +215,10 @@ def from_connection_string(cls, conn_str, **kwargs):
fully_qualified_namespace=host,
entity_name=entity_in_conn_str or kwargs.pop("entity_name", None),
credential=credential, # type: ignore
retry_total=retry_total,
retry_backoff_factor=retry_backoff_factor,
retry_backoff_max=retry_backoff_max,
retry_mode=retry_mode,
**kwargs
)

Expand Down Expand Up @@ -227,8 +265,20 @@ def get_queue_sender(self, queue_name, **kwargs):
self._handlers.add(handler)
return handler

def get_queue_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[str] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs: Any
) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific queue.
:param str queue_name: The path of specific Service Bus Queue the client connects to.
Expand Down Expand Up @@ -280,8 +330,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
"the connection string used to construct the ServiceBusClient."
)

sub_queue = kwargs.get("sub_queue", None)
if sub_queue and kwargs.get("session_id"):
if sub_queue and session_id:
raise ValueError(
"session_id and sub_queue can not be specified simultaneously. "
"To connect to the sub queue of a sessionful queue, "
Expand Down Expand Up @@ -314,6 +363,12 @@ def get_queue_receiver(self, queue_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
self._handlers.add(handler)
Expand Down Expand Up @@ -361,8 +416,21 @@ def get_topic_sender(self, topic_name, **kwargs):
self._handlers.add(handler)
return handler

def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
def get_subscription_receiver(
self,
topic_name: str,
subscription_name: str,
*,
session_id: Optional[str] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[
ServiceBusReceiveMode, str
] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs: Any
) -> ServiceBusReceiver:
"""Get ServiceBusReceiver for the specific subscription under the topic.
:param str topic_name: The name of specific Service Bus Topic the client connects to.
Expand Down Expand Up @@ -417,8 +485,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
"the connection string used to construct the ServiceBusClient."
)

sub_queue = kwargs.get("sub_queue", None)
if sub_queue and kwargs.get("session_id"):
if sub_queue and session_id:
raise ValueError(
"session_id and sub_queue can not be specified simultaneously. "
"To connect to the sub queue of a sessionful subscription, "
Expand Down Expand Up @@ -446,6 +513,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
except ValueError:
Expand All @@ -467,6 +540,12 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
retry_total=self._config.retry_total,
retry_backoff_factor=self._config.retry_backoff_factor,
retry_backoff_max=self._config.retry_backoff_max,
session_id=session_id,
sub_queue=sub_queue,
receive_mode=receive_mode,
max_wait_time=max_wait_time,
auto_lock_renewer=auto_lock_renewer,
prefetch_count=prefetch_count,
**kwargs
)
self._handlers.add(handler)
Expand Down
Loading

0 comments on commit 60a7f62

Please sign in to comment.