From ef0edbba89a489b7d6b44764b360cc474633792d Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 12 Aug 2020 15:14:17 -0700 Subject: [PATCH 1/4] close spawned children --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 3 + .../azure/servicebus/_servicebus_client.py | 40 ++++++++++--- .../aio/_servicebus_client_async.py | 38 +++++++++--- .../tests/async_tests/test_sb_client_async.py | 58 +++++++++++++++++++ .../azure-servicebus/tests/test_sb_client.py | 43 +++++++++++++- 5 files changed, 164 insertions(+), 18 deletions(-) create mode 100644 sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 3556002dbdab..122d0f541f31 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -2,6 +2,9 @@ ## 7.0.0b6 (Unreleased) +**Breaking Changes** + +* `ServiceBusClient.close()` now closes spawned senders and receivers. ## 7.0.0b5 (2020-08-10) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index c0ad711e2590..7db284ee5b81 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -69,6 +69,7 @@ def __init__( self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False + self._handlers = [] def __enter__(self): if self._connection_sharing: @@ -89,10 +90,15 @@ def _create_uamqp_connection(self): def close(self): # type: () -> None """ - Close down the ServiceBus client and the underlying connection. + Close down the ServiceBus client. + All spawned senders, receivers and underlying connection will be shutdown. :return: None """ + for handler in self._handlers: + handler.close() + self._handlers.clear() + if self._connection_sharing and self._connection: self._connection.destroy() @@ -157,7 +163,7 @@ def get_queue_sender(self, queue_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusSender( + handler = ServiceBusSender( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -168,6 +174,8 @@ def get_queue_sender(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_receiver(self, queue_name, **kwargs): # type: (str, Any) -> ServiceBusReceiver @@ -205,7 +213,7 @@ def get_queue_receiver(self, queue_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -216,6 +224,8 @@ def get_queue_receiver(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_deadletter_receiver(self, queue_name, **kwargs): # type: (str, Any) -> ServiceBusReceiver @@ -265,7 +275,7 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs): queue_name=queue_name, transfer_deadletter=kwargs.get('transfer_deadletter', False) ) - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, entity_name=entity_name, credential=self._credential, @@ -277,6 +287,8 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_topic_sender(self, topic_name, **kwargs): # type: (str, Any) -> ServiceBusSender @@ -300,7 +312,7 @@ def get_topic_sender(self, topic_name, **kwargs): :caption: Create a new instance of the ServiceBusSender from ServiceBusClient. """ - return ServiceBusSender( + handler = ServiceBusSender( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, credential=self._credential, @@ -311,6 +323,8 @@ def get_topic_sender(self, topic_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): # type: (str, str, Any) -> ServiceBusReceiver @@ -353,7 +367,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, subscription_name=subscription_name, @@ -365,6 +379,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs): # type: (str, str, Any) -> ServiceBusReceiver @@ -416,7 +432,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** subscription_name=subscription_name, transfer_deadletter=kwargs.get('transfer_deadletter', False) ) - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, entity_name=entity_name, credential=self._credential, @@ -428,6 +444,8 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): # type: (str, str, str, Any) -> ServiceBusReceiver @@ -473,7 +491,7 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi """ # pylint: disable=protected-access - return ServiceBusSessionReceiver( + handler = ServiceBusSessionReceiver( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, subscription_name=subscription_name, @@ -486,6 +504,8 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): # type: (str, str, Any) -> ServiceBusSessionReceiver @@ -526,7 +546,7 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): """ # pylint: disable=protected-access - return ServiceBusSessionReceiver( + handler = ServiceBusSessionReceiver( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -538,3 +558,5 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 066a82b9ea58..338a8102db2f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -71,6 +71,7 @@ def __init__( self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False + self._handlers = [] async def __aenter__(self): if self._connection_sharing: @@ -133,9 +134,14 @@ async def close(self): # type: () -> None """ Close down the ServiceBus client. + All spawned senders, receivers and underlying connection will be shutdown. :return: None """ + for handler in self._handlers: + await handler.close() + self._handlers.clear() + if self._connection_sharing and self._connection: await self._connection.destroy_async() @@ -159,7 +165,7 @@ def get_queue_sender(self, queue_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusSender( + handler = ServiceBusSender( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -170,6 +176,8 @@ def get_queue_sender(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_receiver(self, queue_name, **kwargs): # type: (str, Any) -> ServiceBusReceiver @@ -206,7 +214,7 @@ def get_queue_receiver(self, queue_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -217,6 +225,8 @@ def get_queue_receiver(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_deadletter_receiver(self, queue_name, **kwargs): # type: (str, Any) -> ServiceBusReceiver @@ -266,7 +276,7 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs): queue_name=queue_name, transfer_deadletter=kwargs.get('transfer_deadletter', False) ) - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, entity_name=entity_name, credential=self._credential, @@ -278,6 +288,8 @@ def get_queue_deadletter_receiver(self, queue_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_topic_sender(self, topic_name, **kwargs): # type: (str, Any) -> ServiceBusSender @@ -301,7 +313,7 @@ def get_topic_sender(self, topic_name, **kwargs): :caption: Create a new instance of the ServiceBusSender from ServiceBusClient. """ - return ServiceBusSender( + handler = ServiceBusSender( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, credential=self._credential, @@ -312,6 +324,8 @@ def get_topic_sender(self, topic_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): # type: (str, str, Any) -> ServiceBusReceiver @@ -354,7 +368,7 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): """ # pylint: disable=protected-access - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, subscription_name=subscription_name, @@ -366,6 +380,8 @@ def get_subscription_receiver(self, topic_name, subscription_name, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_deadletter_receiver(self, topic_name, subscription_name, **kwargs): # type: (str, str, Any) -> ServiceBusReceiver @@ -417,7 +433,7 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** subscription_name=subscription_name, transfer_deadletter=kwargs.get('transfer_deadletter', False) ) - return ServiceBusReceiver( + handler = ServiceBusReceiver( fully_qualified_namespace=self.fully_qualified_namespace, entity_name=entity_name, credential=self._credential, @@ -429,6 +445,8 @@ def get_subscription_deadletter_receiver(self, topic_name, subscription_name, ** user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_subscription_session_receiver(self, topic_name, subscription_name, session_id=None, **kwargs): # type: (str, str, str, Any) -> ServiceBusReceiver @@ -474,7 +492,7 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi """ # pylint: disable=protected-access - return ServiceBusSessionReceiver( + handler = ServiceBusSessionReceiver( fully_qualified_namespace=self.fully_qualified_namespace, topic_name=topic_name, subscription_name=subscription_name, @@ -487,6 +505,8 @@ def get_subscription_session_receiver(self, topic_name, subscription_name, sessi user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): # type: (str, str, Any) -> ServiceBusSessionReceiver @@ -526,7 +546,7 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): """ # pylint: disable=protected-access - return ServiceBusSessionReceiver( + handler = ServiceBusSessionReceiver( fully_qualified_namespace=self.fully_qualified_namespace, queue_name=queue_name, credential=self._credential, @@ -538,3 +558,5 @@ def get_queue_session_receiver(self, queue_name, session_id=None, **kwargs): user_agent=self._config.user_agent, **kwargs ) + self._handlers.append(handler) + return handler diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py new file mode 100644 index 000000000000..19f1ab9d6ebc --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py @@ -0,0 +1,58 @@ +#-------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + + +import logging +import pytest + +from azure.servicebus.aio import ServiceBusClient +from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer +from servicebus_preparer import CachedServiceBusNamespacePreparer, CachedServiceBusQueuePreparer +from utilities import get_logger + +_logger = get_logger(logging.DEBUG) + + +class ServiceBusClientAsyncTests(AzureMgmtTestCase): + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer() + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) + + # context manager + async with client: + assert len(client._handlers) == 0 + sender = client.get_queue_sender(servicebus_queue.name) + receiver = client.get_queue_receiver(servicebus_queue.name) + await sender._open() + await receiver._open() + + assert sender._handler and sender._running + assert receiver._handler and receiver._running + assert len(client._handlers) == 2 + + assert not sender._handler and not sender._running + assert not receiver._handler and not receiver._running + assert len(client._handlers) == 0 + + # close operation + sender = client.get_queue_sender(servicebus_queue.name) + receiver = client.get_queue_receiver(servicebus_queue.name) + await sender._open() + await receiver._open() + + assert sender._handler and sender._running + assert receiver._handler and receiver._running + assert len(client._handlers) == 2 + + await client.close() + + assert not sender._handler and not sender._running + assert not receiver._handler and not receiver._running + assert len(client._handlers) == 0 diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index 8f69482f0c4f..316ee1ef2550 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -28,7 +28,8 @@ ServiceBusTopicPreparer, ServiceBusQueuePreparer, ServiceBusNamespaceAuthorizationRulePreparer, - ServiceBusQueueAuthorizationRulePreparer + ServiceBusQueueAuthorizationRulePreparer, + CachedServiceBusQueuePreparer ) class ServiceBusClientTests(AzureMgmtTestCase): @@ -126,3 +127,43 @@ def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization with pytest.raises(ServiceBusError): with client.get_queue_sender(wrong_queue.name) as sender: sender.send_messages(Message("test")) + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer() + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', dead_lettering_on_message_expiration=True) + def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) + + # context manager + with client: + assert len(client._handlers) == 0 + sender = client.get_queue_sender(servicebus_queue.name) + receiver = client.get_queue_receiver(servicebus_queue.name) + sender._open() + receiver._open() + + assert sender._handler and sender._running + assert receiver._handler and receiver._running + assert len(client._handlers) == 2 + + assert not sender._handler and not sender._running + assert not receiver._handler and not receiver._running + assert len(client._handlers) == 0 + + # close operation + sender = client.get_queue_sender(servicebus_queue.name) + receiver = client.get_queue_receiver(servicebus_queue.name) + sender._open() + receiver._open() + + assert sender._handler and sender._running + assert receiver._handler and receiver._running + assert len(client._handlers) == 2 + + client.close() + + assert not sender._handler and not sender._running + assert not receiver._handler and not receiver._running + assert len(client._handlers) == 0 From a8a07820a43fd18a1ba734e3b76254c44749c8eb Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 12 Aug 2020 16:29:07 -0700 Subject: [PATCH 2/4] fix mypy --- .../azure-servicebus/azure/servicebus/_servicebus_client.py | 6 +++--- .../azure/servicebus/aio/_servicebus_client_async.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 7db284ee5b81..3e14bdccbb30 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -2,11 +2,11 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, TYPE_CHECKING +from typing import Any, List, TYPE_CHECKING import uamqp -from ._base_handler import _parse_conn_str, ServiceBusSharedKeyCredential +from ._base_handler import _parse_conn_str, ServiceBusSharedKeyCredential, BaseHandler from ._servicebus_sender import ServiceBusSender from ._servicebus_receiver import ServiceBusReceiver from ._servicebus_session_receiver import ServiceBusSessionReceiver @@ -69,7 +69,7 @@ def __init__( self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False - self._handlers = [] + self._handlers = [] # type: List[BaseHandler] def __enter__(self): if self._connection_sharing: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 338a8102db2f..4c0acfa1a0f3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -2,12 +2,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -from typing import Any, TYPE_CHECKING +from typing import Any, List, TYPE_CHECKING import uamqp from .._base_handler import _parse_conn_str -from ._base_handler_async import ServiceBusSharedKeyCredential +from ._base_handler_async import ServiceBusSharedKeyCredential, BaseHandler from ._servicebus_sender_async import ServiceBusSender from ._servicebus_receiver_async import ServiceBusReceiver from ._servicebus_session_receiver_async import ServiceBusSessionReceiver @@ -71,7 +71,7 @@ def __init__( self._auth_uri = "{}/{}".format(self._auth_uri, self._entity_name) # Internal flag for switching whether to apply connection sharing, pending fix in uamqp library self._connection_sharing = False - self._handlers = [] + self._handlers = [] # type: List[BaseHandler] async def __aenter__(self): if self._connection_sharing: From 71de398b85d576075b5b2acddf1462f087f628ba Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 28 Aug 2020 11:26:24 -0700 Subject: [PATCH 3/4] update according to comments --- .../azure/servicebus/_servicebus_client.py | 12 +++++++++++- .../azure/servicebus/aio/_servicebus_client_async.py | 12 +++++++++++- .../tests/async_tests/test_sb_client_async.py | 2 ++ .../azure-servicebus/tests/test_sb_client.py | 2 ++ 4 files changed, 26 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 3e14bdccbb30..a5677f0b151c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Any, List, TYPE_CHECKING +import logging import uamqp @@ -16,6 +17,8 @@ if TYPE_CHECKING: from azure.core.credentials import TokenCredential +_LOGGER = logging.getLogger(__name__) + class ServiceBusClient(object): """The ServiceBusClient class defines a high level interface for @@ -96,7 +99,14 @@ def close(self): :return: None """ for handler in self._handlers: - handler.close() + try: + handler.close() + except Exception as exception: # pylint: disable=broad-except + _LOGGER.error( + "Client has met an exception when closing the handler: %r. Exception: %r.", + handler._container_id, # pylint: disable=protected-access + exception, + ) self._handlers.clear() if self._connection_sharing and self._connection: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index 4c0acfa1a0f3..bf34ba0c413d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Any, List, TYPE_CHECKING +import logging import uamqp @@ -18,6 +19,8 @@ if TYPE_CHECKING: from azure.core.credentials import TokenCredential +_LOGGER = logging.getLogger(__name__) + class ServiceBusClient(object): """The ServiceBusClient class defines a high level interface for @@ -139,7 +142,14 @@ async def close(self): :return: None """ for handler in self._handlers: - await handler.close() + try: + await handler.close() + except Exception as exception: # pylint: disable=broad-except + _LOGGER.error( + "Client has met an exception when closing the handler: %r. Exception: %r.", + handler._container_id, # pylint: disable=protected-access + exception, + ) self._handlers.clear() if self._connection_sharing and self._connection: diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py index 19f1ab9d6ebc..351d1fe51a99 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sb_client_async.py @@ -25,6 +25,8 @@ class ServiceBusClientAsyncTests(AzureMgmtTestCase): async def test_async_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) + await client.close() + # context manager async with client: assert len(client._handlers) == 0 diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index 316ee1ef2550..f9cf5fb0e306 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -136,6 +136,8 @@ def test_sb_client_incorrect_queue_conn_str(self, servicebus_queue_authorization def test_sb_client_close_spawned_handlers(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): client = ServiceBusClient.from_connection_string(servicebus_namespace_connection_string) + client.close() + # context manager with client: assert len(client._handlers) == 0 From a50cd278f9b763f578bdbb6f41669bfe2cf595dc Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 28 Aug 2020 13:51:07 -0700 Subject: [PATCH 4/4] python27 compatible --- .../azure-servicebus/azure/servicebus/_servicebus_client.py | 2 +- .../azure/servicebus/aio/_servicebus_client_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py index 787eafdbaf7e..be27075ea2de 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_client.py @@ -107,7 +107,7 @@ def close(self): handler._container_id, # pylint: disable=protected-access exception, ) - self._handlers.clear() + del self._handlers[:] if self._connection_sharing and self._connection: self._connection.destroy() diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py index eae7adb06f42..a6827a8ae91a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_client_async.py @@ -150,7 +150,7 @@ async def close(self): handler._container_id, # pylint: disable=protected-access exception, ) - self._handlers.clear() + del self._handlers[:] if self._connection_sharing and self._connection: await self._connection.destroy_async()