diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/__init__.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py deleted file mode 100644 index 654b7b40b532..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/_test_base.py +++ /dev/null @@ -1,153 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import uuid -from urllib.parse import urlparse - -from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes - -from azure.servicebus import ServiceBusClient, ReceiveSettleMode, Message -from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient -from azure.servicebus.control_client import ServiceBusService -from azure.servicebus.control_client.models import Queue - -MAX_QUEUE_SIZE = 40960 - - -def parse_connection_string(conn_str): - conn_settings = [s.split("=", 1) for s in conn_str.split(";")] - conn_settings = dict(conn_settings) - shared_access_key = conn_settings.get('SharedAccessKey') - shared_access_key_name = conn_settings.get('SharedAccessKeyName') - endpoint = conn_settings.get('Endpoint') - parsed = urlparse(endpoint.rstrip('/')) - namespace = parsed.netloc.strip().split('.')[0] - return { - 'namespace': namespace, - 'endpoint': endpoint, - 'entity_path': conn_settings.get('EntityPath'), - 'shared_access_key_name': shared_access_key_name, - 'shared_access_key': shared_access_key - } - - -class _ServiceTest(PerfStressTest): - service_client = None - async_service_client = None - - def __init__(self, arguments): - super().__init__(arguments) - - connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") - if self.args.no_client_share: - self.service_client = ServiceBusClient.from_connection_string(connection_string) - self.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) - else: - if not _ServiceTest.service_client: - _ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string) - _ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) - self.service_client = _ServiceTest.service_client - self.async_service_client =_ServiceTest.async_service_client - - @staticmethod - def add_arguments(parser): - super(_ServiceTest, _ServiceTest).add_arguments(parser) - parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100) - parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False) - parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100) - - -class _QueueTest(_ServiceTest): - queue_name = "perfstress-" + str(uuid.uuid4()) - queue_client = None - async_queue_client = None - - def __init__(self, arguments): - super().__init__(arguments) - connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") - connection_props = parse_connection_string(connection_string) - self.mgmt_client = ServiceBusService( - service_namespace=connection_props['namespace'], - shared_access_key_name=connection_props['shared_access_key_name'], - shared_access_key_value=connection_props['shared_access_key']) - - async def global_setup(self): - await super().global_setup() - queue = Queue(max_size_in_megabytes=MAX_QUEUE_SIZE) - self.mgmt_client.create_queue(self.queue_name, queue=queue) - - async def setup(self): - await super().setup() - # In T1, these operations check for the existance of the queue - # so must be created during setup, rather than in the constructor. - self.queue_client = self.service_client.get_queue(self.queue_name) - self.async_queue_client = self.async_service_client.get_queue(self.queue_name) - - async def global_cleanup(self): - self.mgmt_client.delete_queue(self.queue_name) - await super().global_cleanup() - - -class _SendTest(_QueueTest): - sender = None - async_sender = None - - async def setup(self): - await super().setup() - self.sender = self.queue_client.get_sender() - self.async_sender = self.async_queue_client.get_sender() - self.sender.open() - await self.async_sender.open() - - async def close(self): - self.sender.close() - await self.async_sender.close() - await super().close() - - -class _ReceiveTest(_QueueTest): - receiver = None - async_receiver = None - - async def global_setup(self): - await super().global_setup() - await self._preload_queue() - - async def setup(self): - await super().setup() - mode = ReceiveSettleMode.PeekLock if self.args.peeklock else ReceiveSettleMode.ReceiveAndDelete - self.receiver = self.queue_client.get_receiver( - mode=mode, - prefetch=self.args.num_messages, - idle_timeout=self.args.max_wait_time) - self.async_receiver = self.async_queue_client.get_receiver( - mode=mode, - prefetch=self.args.num_messages, - idle_timeout=self.args.max_wait_time) - self.receiver.open() - await self.async_receiver.open() - - async def _preload_queue(self): - data = get_random_bytes(self.args.message_size) - async_queue_client = self.async_service_client.get_queue(self.queue_name) - async with async_queue_client.get_sender() as sender: - for i in range(self.args.preload): - sender.queue_message(Message(data)) - if i % 1000 == 0: - print("Loaded {} messages".format(i)) - await sender.send_pending_messages() - await sender.send_pending_messages() - - async def close(self): - self.receiver.close() - await self.async_receiver.close() - await super().close() - - @staticmethod - def add_arguments(parser): - super(_ReceiveTest, _ReceiveTest).add_arguments(parser) - parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False) - parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0) - parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py deleted file mode 100644 index 171786f81144..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_batch.py +++ /dev/null @@ -1,27 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import asyncio - -from ._test_base import _ReceiveTest - - -class LegacyReceiveMessageBatchTest(_ReceiveTest): - def run_sync(self): - count = 0 - while count < self.args.num_messages: - batch = self.receiver.fetch_next(max_batch_size=self.args.num_messages - count) - if self.args.peeklock: - for msg in batch: - msg.complete() - count += len(batch) - - async def run_async(self): - count = 0 - while count < self.args.num_messages: - batch = await self.async_receiver.fetch_next(max_batch_size=self.args.num_messages - count) - if self.args.peeklock: - await asyncio.gather(*[m.complete() for m in batch]) - count += len(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py deleted file mode 100644 index 0e4c9fb79642..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message.py +++ /dev/null @@ -1,25 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -from ._test_base import _SendTest - -from azure_devtools.perfstress_tests import get_random_bytes - -from azure.servicebus import Message -from azure.servicebus.aio import Message as AsyncMessage - - -class LegacySendMessageTest(_SendTest): - def __init__(self, arguments): - super().__init__(arguments) - self.data = get_random_bytes(self.args.message_size) - - def run_sync(self): - message = Message(self.data) - self.sender.send(message) - - async def run_async(self): - message = AsyncMessage(self.data) - await self.async_sender.send(message) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py deleted file mode 100644 index fc73cf0bb2d7..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/send_message_batch.py +++ /dev/null @@ -1,26 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -from ._test_base import _SendTest - -from azure_devtools.perfstress_tests import get_random_bytes - -from azure.servicebus import BatchMessage - - -class LegacySendMessageBatchTest(_SendTest): - def __init__(self, arguments): - super().__init__(arguments) - self.data = get_random_bytes(self.args.message_size) - - def run_sync(self): - messages = (self.data for _ in range(self.args.num_messages)) - batch = BatchMessage(messages) - self.sender.send(batch) - - async def run_async(self): - messages = (self.data for _ in range(self.args.num_messages)) - batch = BatchMessage(messages) - await self.async_sender.send(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt b/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt deleted file mode 100644 index e07a582d0f19..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/t1_test_requirements.txt +++ /dev/null @@ -1 +0,0 @@ -azure-servicebus>=0.5,<1.0.0 diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py index f0ac79bf020e..b22328808983 100644 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/_test_base.py @@ -5,124 +5,265 @@ import uuid -from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes +from azure_devtools.perfstress_tests import PerfStressTest, get_random_bytes, BatchPerfTest -from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, ServiceBusMessage +from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, ServiceBusMessage, TransportType from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient from azure.servicebus.aio.management import ServiceBusAdministrationClient -MAX_QUEUE_SIZE = 40960 +class _ReceiveTest(): + def setup_servicebus_clients(self, transport_type, peeklock, num_messages, max_wait_time, uamqp_tranport) -> None: + self.connection_string=self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + transport_type=TransportType.AmqpOverWebsocket if transport_type == 1 else TransportType.Amqp + mode=ServiceBusReceiveMode.PEEK_LOCK if peeklock else ServiceBusReceiveMode.RECEIVE_AND_DELETE + uamqp_tranport = uamqp_tranport if uamqp_tranport else False + self.servicebus_client=ServiceBusClient.from_connection_string( + self.connection_string, + receive_mode=mode, + prefetch_count=num_messages, + max_wait_time=max_wait_time or None, + transport_type=transport_type, + uamqp_transport=uamqp_tranport, + ) + self.async_servicebus_client=AsyncServiceBusClient.from_connection_string( + self.connection_string, + receive_mode=mode, + prefetch_count=num_messages, + max_wait_time=max_wait_time or None, + transport_type=transport_type, + uamqp_transport=uamqp_tranport, + ) + async def close_clients(self) -> None: + self.servicebus_client.close + await self.async_servicebus_client.close() + await super().close() + async def _preload_topic(self) -> None: + data=get_random_bytes(self.args.message_size) -class _ServiceTest(PerfStressTest): - service_client = None - async_service_client = None + current_topic_message_count = 0 - def __init__(self, arguments): - super().__init__(arguments) + async with ServiceBusAdministrationClient.from_connection_string(self.connection_string) as admin_client: + topic_properties = await admin_client.get_topic_runtime_properties(self.topic_name) + current_topic_message_count = topic_properties.scheduled_message_count + - connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") - if self.args.no_client_share: - self.service_client = ServiceBusClient.from_connection_string(connection_string) - self.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) - else: - if not _ServiceTest.service_client: - _ServiceTest.service_client = ServiceBusClient.from_connection_string(connection_string) - _ServiceTest.async_service_client = AsyncServiceBusClient.from_connection_string(connection_string) - self.service_client = _ServiceTest.service_client - self.async_service_client =_ServiceTest.async_service_client + print(f"The current topic {self.topic_name} has {current_topic_message_count} messages") + async with self.async_servicebus_client.get_topic_sender(self.topic_name) as sender: + batch = await sender.create_message_batch() - async def close(self): - self.service_client.close() - await self.async_service_client.close() - await super().close() + for i in range(current_topic_message_count, self.args.preload): + try: + batch.add_message(ServiceBusMessage(data)) + except ValueError: + await sender.send_messages(batch) + print(f"Loaded {i} messages") + batch = await sender.create_message_batch() + batch.add_message(ServiceBusMessage(data)) + + if len(batch): + await sender.send_messages(batch) + + async def _preload_queue(self) -> None: + data=get_random_bytes(self.args.message_size) + + current_queue_message_count = 0 + + async with ServiceBusAdministrationClient.from_connection_string(self.connection_string) as admin_client: + queue_properties = await admin_client.get_queue_runtime_properties(self.queue_name) + current_queue_message_count = queue_properties.active_message_count + + + print(f"The current queue {self.queue_name} has {current_queue_message_count} messages") + + async with self.async_servicebus_client.get_queue_sender(self.queue_name) as sender: + batch = await sender.create_message_batch() + + for i in range(current_queue_message_count, self.args.preload): + try: + batch.add_message(ServiceBusMessage(data)) + except ValueError: + await sender.send_messages(batch) + print(f"Loaded {i} messages") + batch = await sender.create_message_batch() + batch.add_message(ServiceBusMessage(data)) + if len(batch): + await sender.send_messages(batch) + @staticmethod - def add_arguments(parser): - super(_ServiceTest, _ServiceTest).add_arguments(parser) + def add_arguments(parser) -> None: parser.add_argument('--message-size', nargs='?', type=int, help='Size of a single message. Defaults to 100 bytes', default=100) - parser.add_argument('--no-client-share', action='store_true', help='Create one ServiceClient per test instance. Default is to share a single ServiceClient.', default=False) - parser.add_argument('--num-messages', nargs='?', type=int, help='Number of messages to send or receive. Defaults to 100', default=100) + parser.add_argument('--num-messages', nargs='?', type=int, help='Maximum number of messages to receive. Defaults to 100', default=100) + parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False) + parser.add_argument('--uamqp-transport', action="store_true", help="Switch to use uamqp transport. Default is False (pyamqp).", default=False) + parser.add_argument('--transport-type', nargs='?', type=int, help="Use Amqp (0) or Websocket (1) transport type. Default is Amqp.", default=0) + parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0) + parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000) +class _QueueReceiveTest(_ReceiveTest, PerfStressTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.setup_servicebus_clients( + arguments.transport_type, + arguments.peeklock, + arguments.num_messages, + arguments.max_wait_time, + arguments.uamqp_transport + ) + self.queue_name=self.get_from_env('AZURE_SERVICEBUS_QUEUE_NAME') + + self.receiver=self.servicebus_client.get_queue_receiver(self.queue_name) + self.async_receiver=self.async_servicebus_client.get_queue_receiver(self.queue_name) + + async def global_setup(self) -> None: + await super().global_setup() + await self._preload_queue() + + async def close(self) -> None: + self.receiver.close() + await self.async_receiver.close() + await self.close_clients() + await super().close() -class _QueueTest(_ServiceTest): - queue_name = "perfstress-" + str(uuid.uuid4()) + + - def __init__(self, arguments): +class _SubscriptionReceiveTest(_ReceiveTest, PerfStressTest): + def __init__(self, arguments) -> None: super().__init__(arguments) - connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") - self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string) + self.setup_servicebus_clients( + arguments.transport_type, + arguments.peeklock, + arguments.num_messages, + arguments.max_wait_time, + arguments.uamqp_transport + ) + self.topic_name=self.get_from_env('AZURE_SERVICEBUS_TOPIC_NAME') + self.subscription_name=self.get_from_env('AZURE_SERVICE_BUS_SUBSCRIPTION_NAME') - async def global_setup(self): + self.receiver=self.servicebus_client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=self.subscription_name) + self.async_receiver=self.async_servicebus_client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=self.subscription_name) + + async def global_setup(self) -> None: await super().global_setup() - await self.async_mgmt_client.create_queue(self.queue_name, max_size_in_megabytes=MAX_QUEUE_SIZE) + await self._preload_topic() + + async def close(self) -> None: + self.receiver.close() + await self.async_receiver.close() + await self.close_clients() + await super().close() + + +class _QueueReceiveBatchTest(_ReceiveTest, BatchPerfTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.setup_servicebus_clients( + arguments.transport_type, + arguments.peeklock, + arguments.num_messages, + arguments.max_wait_time, + arguments.uamqp_transport + ) + self.queue_name=self.get_from_env('AZURE_SERVICEBUS_QUEUE_NAME') + self.receiver=self.servicebus_client.get_queue_receiver(self.queue_name) + self.async_receiver=self.async_servicebus_client.get_queue_receiver(self.queue_name) + + async def global_setup(self) -> None: + await super().global_setup() + await self._preload_queue() + + async def close(self) -> None: + self.receiver.close() + await self.async_receiver.close() + await self.close_clients() + await super().close() + - async def global_cleanup(self): - await self.async_mgmt_client.delete_queue(self.queue_name) - await super().global_cleanup() +class _SubscriptionReceiveBatchTest(_ReceiveTest, BatchPerfTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.setup_servicebus_clients( + arguments.transport_type, + arguments.peeklock, + arguments.num_messages, + arguments.max_wait_time, + arguments.uamqp_transport + ) + + self.topic_name=self.get_from_env('AZURE_SERVICEBUS_TOPIC_NAME') + self.subscription_name=self.get_from_env('AZURE_SERVICE_BUS_SUBSCRIPTION_NAME') - async def close(self): - await self.async_mgmt_client.close() + self.receiver=self.servicebus_client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=self.subscription_name) + self.async_receiver=self.async_servicebus_client.get_subscription_receiver(topic_name=self.topic_name, subscription_name=self.subscription_name) + + async def global_setup(self) -> None: + await super().global_setup() + await self._preload_topic() + + async def close(self) -> None: + self.receiver.close() + await self.async_receiver.close() + await self.close_clients() + await super().close() + +class _SendTest(BatchPerfTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + transport_type=TransportType.AmqpOverWebsocket if arguments.transport_type == 1 else TransportType.Amqp + + self.connection_string=self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") + self.service_client=ServiceBusClient.from_connection_string( + self.connection_string, + transport_type=transport_type, + uamqp_transport=arguments.uamqp_transport, + ) + self.async_service_client=AsyncServiceBusClient.from_connection_string( + self.connection_string, + transport_type=transport_type, + uamqp_transport=arguments.uamqp_transport, + ) + async def close(self) -> None: + self.service_client.close() + await self.async_service_client.close() await super().close() -class _SendTest(_QueueTest): - def __init__(self, arguments): +class _SendQueueTest(_SendTest): + def __init__(self, arguments) -> None: super().__init__(arguments) - connection_string = self.get_from_env("AZURE_SERVICEBUS_CONNECTION_STRING") - self.async_mgmt_client = ServiceBusAdministrationClient.from_connection_string(connection_string) - self.sender = self.service_client.get_queue_sender(self.queue_name) - self.async_sender = self.async_service_client.get_queue_sender(self.queue_name) + + self.queue_name=self.get_from_env('AZURE_SERVICEBUS_QUEUE_NAME') + self.sender=self.service_client.get_queue_sender(self.queue_name) + self.async_sender=self.async_service_client.get_queue_sender(self.queue_name) + + async def setup(self) -> None: + await super().setup() + self.sender.create_message_batch() + await self.async_sender.create_message_batch() - async def close(self): + async def close(self) -> None: self.sender.close() await self.async_sender.close() await super().close() -class _ReceiveTest(_QueueTest): - def __init__(self, arguments): +class _SendTopicTest(_SendTest): + def __init__(self, arguments) -> None: super().__init__(arguments) - mode = ServiceBusReceiveMode.PEEK_LOCK if self.args.peeklock else ServiceBusReceiveMode.RECEIVE_AND_DELETE - self.receiver = self.service_client.get_queue_receiver( - queue_name=self.queue_name, - receive_mode=mode, - prefetch_count=self.args.num_messages, - max_wait_time=self.args.max_wait_time or None) - self.async_receiver = self.async_service_client.get_queue_receiver( - queue_name=self.queue_name, - receive_mode=mode, - prefetch_count=self.args.num_messages, - max_wait_time=self.args.max_wait_time or None) - async def _preload_queue(self): - data = get_random_bytes(self.args.message_size) - async with self.async_service_client.get_queue_sender(self.queue_name) as sender: - batch = await sender.create_message_batch() - for i in range(self.args.preload): - try: - batch.add_message(ServiceBusMessage(data)) - except ValueError: - # Batch full - await sender.send_messages(batch) - print("Loaded {} messages".format(i)) - batch = await sender.create_message_batch() - batch.add_message(ServiceBusMessage(data)) - await sender.send_messages(batch) + self.topic_name=self.get_from_env('AZURE_SERVICEBUS_TOPIC_NAME') + self.sender=self.service_client.get_topic_sender(self.topic_name) + self.async_sender=self.async_service_client.get_topic_sender(self.topic_name) - async def global_setup(self): - await super().global_setup() - await self._preload_queue() + async def setup(self) -> None: + await super().setup() + self.sender.create_message_batch() + await self.async_sender.create_message_batch() - async def close(self): - self.receiver.close() - await self.async_receiver.close() - await super().close() - - @staticmethod - def add_arguments(parser): - super(_ReceiveTest, _ReceiveTest).add_arguments(parser) - parser.add_argument('--peeklock', action='store_true', help='Receive using PeekLock mode and message settlement.', default=False) - parser.add_argument('--max-wait-time', nargs='?', type=int, help='Max time to wait for messages before closing. Defaults to 0.', default=0) - parser.add_argument('--preload', nargs='?', type=int, help='Number of messages to preload. Default is 10000.', default=10000) + async def close(self) -> None: + self.sender.close() + await self.async_sender.close() + await super().close() \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py deleted file mode 100644 index e46eb331bb16..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_batch.py +++ /dev/null @@ -1,31 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import asyncio - -from ._test_base import _ReceiveTest - - -class ReceiveMessageBatchTest(_ReceiveTest): - def run_sync(self): - count = 0 - while count < self.args.num_messages: - batch = self.receiver.receive_messages( - max_message_count=self.args.num_messages - count, - max_wait_time=self.args.max_wait_time or None) - if self.args.peeklock: - for msg in batch: - self.receiver.complete_message(msg) - count += len(batch) - - async def run_async(self): - count = 0 - while count < self.args.num_messages: - batch = await self.async_receiver.receive_messages( - max_message_count=self.args.num_messages - count, - max_wait_time=self.args.max_wait_time or None) - if self.args.peeklock: - await asyncio.gather(*[self.async_receiver.complete_message(m) for m in batch]) - count += len(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_batch.py new file mode 100644 index 000000000000..4c444713af21 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_batch.py @@ -0,0 +1,29 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _QueueReceiveBatchTest + + +class ReceiveQueueMessageBatchTest(_QueueReceiveBatchTest): + def run_batch_sync(self) -> None: + batch = self.receiver.receive_messages( + max_message_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None + ) + if self.args.peeklock: + for msg in batch: + self.receiver.complete_message(msg) + return len(batch) + + async def run_batch_async(self) -> None: + batch = await self.async_receiver.receive_messages( + max_message_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None + ) + if self.args.peeklock: + await asyncio.gather(*[self.async_receiver.complete_message(m) for m in batch]) + return len(batch) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_stream.py similarity index 87% rename from sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py rename to sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_stream.py index f9ef6473481b..8c291a92e136 100644 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_message_stream.py +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_queue_message_stream.py @@ -5,11 +5,11 @@ import asyncio -from ._test_base import _ReceiveTest +from ._test_base import _QueueReceiveTest -class ReceiveMessageStreamTest(_ReceiveTest): - def run_sync(self): +class ReceiveQueueMessageStreamTest(_QueueReceiveTest): + def run_sync(self) -> None: count = 0 if self.args.peeklock: for msg in self.receiver: @@ -23,7 +23,7 @@ def run_sync(self): break count += 1 - async def run_async(self): + async def run_async(self) -> None: count = 0 if self.args.peeklock: async for msg in self.async_receiver: diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_batch.py new file mode 100644 index 000000000000..58608d40aef0 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_batch.py @@ -0,0 +1,27 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import asyncio + +from ._test_base import _SubscriptionReceiveBatchTest + + +class ReceiveSubscriptionMessageBatchTest(_SubscriptionReceiveBatchTest): + def run_batch_sync(self) -> None: + batch = self.receiver.receive_messages( + max_message_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None) + if self.args.peeklock: + for msg in batch: + self.receiver.complete_message(msg) + return len(batch) + + async def run_batch_async(self) -> None: + batch = await self.async_receiver.receive_messages( + max_message_count=self.args.num_messages, + max_wait_time=self.args.max_wait_time or None) + if self.args.peeklock: + await asyncio.gather(*[self.async_receiver.complete_message(m) for m in batch]) + return len(batch) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_stream.py similarity index 76% rename from sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py rename to sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_stream.py index f740302e3c91..7c3ef5075936 100644 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/T1_legacy_tests/receive_message_stream.py +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/receive_subscription_message_stream.py @@ -5,34 +5,34 @@ import asyncio -from ._test_base import _ReceiveTest +from ._test_base import _SubscriptionReceiveTest -class LegacyReceiveMessageStreamTest(_ReceiveTest): - def run_sync(self): +class ReceiveSubscriptionMessageStreamTest(_SubscriptionReceiveTest): + def run_sync(self) -> None: count = 0 if self.args.peeklock: for msg in self.receiver: if count >= self.args.num_messages: break count += 1 - msg.complete() + self.receiver.complete_message(msg) else: for msg in self.receiver: if count >= self.args.num_messages: break count += 1 - async def run_async(self): + async def run_async(self) -> None: count = 0 if self.args.peeklock: async for msg in self.async_receiver: if count >= self.args.num_messages: break count += 1 - await msg.complete() + await self.async_receiver.complete_message(msg) else: async for msg in self.async_receiver: if count >= self.args.num_messages: break - count += 1 + count += 1 \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py deleted file mode 100644 index 03887562c5a2..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message.py +++ /dev/null @@ -1,23 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -from ._test_base import _SendTest - -from azure_devtools.perfstress_tests import get_random_bytes - -from azure.servicebus import ServiceBusMessage - -class SendMessageTest(_SendTest): - def __init__(self, arguments): - super().__init__(arguments) - self.data = get_random_bytes(self.args.message_size) - - def run_sync(self): - message = ServiceBusMessage(self.data) - self.sender.send_messages(message) - - async def run_async(self): - message = ServiceBusMessage(self.data) - await self.async_sender.send_messages(message) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py deleted file mode 100644 index 78bb0bf8f669..000000000000 --- a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_message_batch.py +++ /dev/null @@ -1,40 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -from ._test_base import _SendTest - -from azure_devtools.perfstress_tests import get_random_bytes - -from azure.servicebus import ServiceBusMessage - - -class SendMessageBatchTest(_SendTest): - def __init__(self, arguments): - super().__init__(arguments) - self.data = get_random_bytes(self.args.message_size) - - def run_sync(self): - batch = self.sender.create_message_batch() - for i in range(self.args.num_messages): - try: - batch.add_message(ServiceBusMessage(self.data)) - except ValueError: - # Batch full - self.sender.send_messages(batch) - batch = self.sender.create_message_batch() - batch.add_message(ServiceBusMessage(self.data)) - self.sender.send_messages(batch) - - async def run_async(self): - batch = await self.async_sender.create_message_batch() - for i in range(self.args.num_messages): - try: - batch.add_message(ServiceBusMessage(self.data)) - except ValueError: - # Batch full - await self.async_sender.send_messages(batch) - batch = await self.async_sender.create_message_batch() - batch.add_message(ServiceBusMessage(self.data)) - await self.async_sender.send_messages(batch) diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message.py new file mode 100644 index 000000000000..968e3cd86aef --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message.py @@ -0,0 +1,35 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendQueueTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + +class SendQueueMessageTest(_SendQueueTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_batch_sync(self) -> int: + if self.args.batch_size > 1: + self.sender.send_messages( + [ServiceBusMessage(self.data) for _ in range(self.args.batch_size)] + ) + else: + self.sender.send_messages(ServiceBusMessage(self.data)) + + return self.args.batch_size + + async def run_batch_async(self) -> int: + if self.args.batch_size > 1: + await self.sender.send_messages( + [ServiceBusMessage(self.data) for _ in range(self.args.batch_size)] + ) + else: + await self.sender.send_messages(ServiceBusMessage(self.data)) + + return self.args.batch_size \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message_batch.py new file mode 100644 index 000000000000..1603b5cf2f63 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_queue_message_batch.py @@ -0,0 +1,31 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendQueueTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + + +class SendQueueMessageBatchTest(_SendQueueTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_batch_sync(self) -> int: + batch = self.sender.create_message_batch() + for _ in range(self.args.batch_size): + batch.add_message(ServiceBusMessage(self.data)) + self.sender.send_messages(batch) + return self.args.batch_size + + async def run_batch_async(self) -> int: + batch = await self.async_sender.create_message_batch() + for _ in range(self.args.batch_size): + batch.add_message(ServiceBusMessage(self.data)) + + await self.async_sender.send_messages(batch) + return self.args.batch_size \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message.py new file mode 100644 index 000000000000..15b6cf96b1cd --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message.py @@ -0,0 +1,35 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _TopicTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + +class SendTopicMessageTest(_TopicTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_batch_sync(self) -> int: + if self.args.batch_size > 1: + self.sender.send_messages( + [ServiceBusMessage(self.data) for _ in range(self.args.batch_size)] + ) + else: + self.sender.send_messages(ServiceBusMessage(self.data)) + + return self.args.batch_size + + async def run_batch_async(self) -> int: + if self.args.batch_size > 1: + await self.sender.send_messages( + [ServiceBusMessage(self.data) for _ in range(self.args.batch_size)] + ) + else: + await self.sender.send_messages(ServiceBusMessage(self.data)) + + return self.args.batch_size \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message_batch.py b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message_batch.py new file mode 100644 index 000000000000..08267f78bdec --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/perf_tests/send_topic_message_batch.py @@ -0,0 +1,30 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._test_base import _SendTopicTest + +from azure_devtools.perfstress_tests import get_random_bytes + +from azure.servicebus import ServiceBusMessage + + +class SendTopicMessageBatchTest(_SendTopicTest): + def __init__(self, arguments) -> None: + super().__init__(arguments) + self.data = get_random_bytes(self.args.message_size) + + def run_batch_sync(self) -> int: + batch = self.sender.create_message_batch() + for _ in range(self.args.batch_size): + batch.add_message(ServiceBusMessage(self.data)) + self.sender.send_messages(batch) + return self.args.batch_size + + async def run_batch_async(self) -> int: + batch = await self.async_sender.create_message_batch() + for _ in range(self.args.batch_size): + batch.add_message(ServiceBusMessage(self.data)) + await self.async_sender.send_messages(batch) + return self.args.batch_size \ No newline at end of file diff --git a/sdk/servicebus/perf-resources.bicep b/sdk/servicebus/perf-resources.bicep new file mode 100644 index 000000000000..f8033a05b022 --- /dev/null +++ b/sdk/servicebus/perf-resources.bicep @@ -0,0 +1,74 @@ +param baseName string = resourceGroup().name +param location string = resourceGroup().location + +var serviceBusNamespaceName = 'sb-${baseName}' +var serviceBusQueueName = 'sb-${baseName}-queue' +var serviceBusTopicName = 'sb-${baseName}-topic' +var serviceBusSubscriptionName = 'sb-${baseName}-subscription' +var defaultSASKeyName = 'RootManageSharedAccessKey' +var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespaceName, defaultSASKeyName) +var sbVersion = '2017-04-01' + + + +resource serviceBusNamespace 'Microsoft.ServiceBus/namespaces@2017-04-01' = { + name: serviceBusNamespaceName + location: location + sku: { + name: 'Standard' + } + properties: {} +} + +resource serviceBusQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { + name: '${serviceBusNamespaceName}/${serviceBusQueueName}' + properties: { + lockDuration: 'PT5M' + maxSizeInMegabytes: 4096 + requiresDuplicateDetection: false + requiresSession: false + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + deadLetteringOnMessageExpiration: false + duplicateDetectionHistoryTimeWindow: 'PT10M' + maxDeliveryCount: 10 + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + enablePartitioning: false + enableExpress: false + } + dependsOn: [ + serviceBusNamespace + ] +} + +resource serviceBusTopic 'Microsoft.ServiceBus/namespaces/topics@2017-04-01' = { + name: '${serviceBusNamespaceName}/${serviceBusTopicName}' + properties: { + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + duplicateDetectionHistoryTimeWindow: 'PT10M' + enableBatchedOperations: true + enableExpress: false + enablePartitioning: false + maxSizeInMegabytes: 4096 + requiresDuplicateDetection: false + status: 'Active' + supportOrdering: true + } + dependsOn: [ + serviceBusNamespace + ] +} + +resource serviceBusNamespace_serviceBusTopicName_serviceBusSubscriptionName 'Microsoft.ServiceBus/namespaces/topics/subscriptions@2017-04-01' = { + name: '${serviceBusNamespaceName}/${serviceBusTopicName}/${serviceBusSubscriptionName}' + properties: { + } + dependsOn: [ + serviceBusTopic + ] +} + + +output AZURE_SERVICEBUS_CONNECTION_STRING string = listkeys(authRuleResourceId, sbVersion).primaryConnectionString +output AZURE_SERVICEBUS_QUEUE_NAME string = serviceBusQueue.name +output AZURE_TOPIC_QUEUE_NAME string = serviceBusTopic.name