Skip to content

Commit

Permalink
[ServiceBus] ServiceBusClient close spawned children (#13077)
Browse files Browse the repository at this point in the history
* close spawned children

* fix mypy

* update according to comments

* python27 compatible
  • Loading branch information
yunhaoling authored Aug 31, 2020
1 parent 4f859ec commit 1dad954
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 22 deletions.
3 changes: 3 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.0.0b6 (Unreleased)

**Breaking Changes**

* `ServiceBusClient.close()` now closes spawned senders and receivers.

## 7.0.0b5 (2020-08-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, TYPE_CHECKING
from typing import Any, List, TYPE_CHECKING
import logging

import uamqp

from ._base_handler import _parse_conn_str, ServiceBusSharedKeyCredential
from ._base_handler import _parse_conn_str, ServiceBusSharedKeyCredential, BaseHandler
from ._servicebus_sender import ServiceBusSender
from ._servicebus_receiver import ServiceBusReceiver
from ._servicebus_session_receiver import ServiceBusSessionReceiver
Expand All @@ -16,6 +17,8 @@
if TYPE_CHECKING:
from azure.core.credentials import TokenCredential

_LOGGER = logging.getLogger(__name__)


class ServiceBusClient(object):
"""The ServiceBusClient class defines a high level interface for
Expand Down Expand Up @@ -69,6 +72,7 @@ def __init__(
self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name)
# Internal flag for switching whether to apply connection sharing, pending fix in uamqp library
self._connection_sharing = False
self._handlers = [] # type: List[BaseHandler]

def __enter__(self):
if self._connection_sharing:
Expand All @@ -89,10 +93,22 @@ def _create_uamqp_connection(self):
def close(self):
# type: () -> None
"""
Close down the ServiceBus client and the underlying connection.
Close down the ServiceBus client.
All spawned senders, receivers and underlying connection will be shutdown.
:return: None
"""
for handler in self._handlers:
try:
handler.close()
except Exception as exception: # pylint: disable=broad-except
_LOGGER.error(
"Client has met an exception when closing the handler: %r. Exception: %r.",
handler._container_id, # pylint: disable=protected-access
exception,
)
del self._handlers[:]

if self._connection_sharing and self._connection:
self._connection.destroy()

Expand Down Expand Up @@ -157,7 +173,7 @@ def get_queue_sender(self, queue_name, **kwargs):
"""
# pylint: disable=protected-access
return ServiceBusSender(
handler = ServiceBusSender(
fully_qualified_namespace=self.fully_qualified_namespace,
queue_name=queue_name,
credential=self._credential,
Expand All @@ -168,6 +184,8 @@ def get_queue_sender(self, queue_name, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_queue_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
Expand Down Expand Up @@ -205,7 +223,7 @@ def get_queue_receiver(self, queue_name, **kwargs):
"""
# pylint: disable=protected-access
return ServiceBusReceiver(
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
queue_name=queue_name,
credential=self._credential,
Expand All @@ -216,6 +234,8 @@ def get_queue_receiver(self, queue_name, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_queue_deadletter_receiver(self, queue_name, **kwargs):
# type: (str, Any) -> ServiceBusReceiver
Expand Down Expand Up @@ -265,7 +285,7 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
queue_name=queue_name,
transfer_deadletter=kwargs.get('transfer_deadletter', False)
)
return ServiceBusReceiver(
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
Expand All @@ -277,6 +297,8 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_topic_sender(self, topic_name, **kwargs):
# type: (str, Any) -> ServiceBusSender
Expand All @@ -300,7 +322,7 @@ def get_topic_sender(self, topic_name, **kwargs):
:caption: Create a new instance of the ServiceBusSender from ServiceBusClient.
"""
return ServiceBusSender(
handler = ServiceBusSender(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
credential=self._credential,
Expand All @@ -311,6 +333,8 @@ def get_topic_sender(self, topic_name, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
Expand Down Expand Up @@ -353,7 +377,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
"""
# pylint: disable=protected-access
return ServiceBusReceiver(
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
subscription_name=subscription_name,
Expand All @@ -365,6 +389,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs):
# type: (str, str, Any) -> ServiceBusReceiver
Expand Down Expand Up @@ -416,7 +442,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
subscription_name=subscription_name,
transfer_deadletter=kwargs.get('transfer_deadletter', False)
)
return ServiceBusReceiver(
handler = ServiceBusReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
entity_name=entity_name,
credential=self._credential,
Expand All @@ -428,6 +454,8 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs):
# type: (str, str, str, Any) -> ServiceBusSessionReceiver
Expand Down Expand Up @@ -473,7 +501,7 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
"""
# pylint: disable=protected-access
return ServiceBusSessionReceiver(
handler = ServiceBusSessionReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
topic_name=topic_name,
subscription_name=subscription_name,
Expand All @@ -486,6 +514,8 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler

def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
# type: (str, str, Any) -> ServiceBusSessionReceiver
Expand Down Expand Up @@ -526,7 +556,7 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
"""
# pylint: disable=protected-access
return ServiceBusSessionReceiver(
handler = ServiceBusSessionReceiver(
fully_qualified_namespace=self.fully_qualified_namespace,
queue_name=queue_name,
credential=self._credential,
Expand All @@ -538,3 +568,5 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs):
user_agent=self._config.user_agent,
**kwargs
)
self._handlers.append(handler)
return handler
Loading

0 comments on commit 1dad954

Please sign in to comment.