diff --git a/sdk/servicebus/azure-servicebus/stress/.helmignore b/sdk/servicebus/azure-servicebus/stress/.helmignore new file mode 100644 index 000000000000..61334a0f31ee --- /dev/null +++ b/sdk/servicebus/azure-servicebus/stress/.helmignore @@ -0,0 +1,6 @@ +stress +stress.exe +.env +Dockerfile +*.py +*.txt \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/stress/Chart.lock b/sdk/servicebus/azure-servicebus/stress/Chart.lock index f5c554172d82..15552cf4dea9 100644 --- a/sdk/servicebus/azure-servicebus/stress/Chart.lock +++ b/sdk/servicebus/azure-servicebus/stress/Chart.lock @@ -2,5 +2,5 @@ dependencies: - name: stress-test-addons repository: https://stresstestcharts.blob.core.windows.net/helm/ version: 0.2.0 -digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0 -generated: "2022-11-19T01:30:02.403917379Z" +digest: sha256:53cbe4c0fed047f6c611523bd34181b21a310e7a3a21cb14f649bb09e4a77648 +generated: "2023-03-14T09:57:20.6731895-07:00" diff --git a/sdk/servicebus/azure-servicebus/stress/Chart.yaml b/sdk/servicebus/azure-servicebus/stress/Chart.yaml index 4f3a42c27ffd..c9401730fb50 100644 --- a/sdk/servicebus/azure-servicebus/stress/Chart.yaml +++ b/sdk/servicebus/azure-servicebus/stress/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v2 -name: python-servicebus-stress-test +name: py-sb-stress-test description: python service bus stress test. version: 0.1.2 appVersion: v0.2 @@ -9,5 +9,5 @@ annotations: dependencies: - name: stress-test-addons - version: ~0.2.0 - repository: https://stresstestcharts.blob.core.windows.net/helm/ + version: 0.2.0 + repository: "@stress-test-charts" diff --git a/sdk/servicebus/azure-servicebus/stress/Dockerfile b/sdk/servicebus/azure-servicebus/stress/Dockerfile index 61c94647f7ee..96439c796da8 100644 --- a/sdk/servicebus/azure-servicebus/stress/Dockerfile +++ b/sdk/servicebus/azure-servicebus/stress/Dockerfile @@ -2,10 +2,12 @@ # public OSS users should simply leave this argument blank or ignore its presence entirely ARG REGISTRY="mcr.microsoft.com/mirror/docker/library/" FROM ${REGISTRY}python:3.8-slim-buster +# Install if running off git branch +# RUN apt-get -y update && apt-get -y install git WORKDIR /app COPY ./scripts /app/stress/scripts WORKDIR /app/stress/scripts -RUN pip3 install -r dev_requirements.txt +RUN pip install -r dev_requirements.txt \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml b/sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml index 82879e81ed7c..04b654df2476 100644 --- a/sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml +++ b/sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml @@ -1,7 +1,32 @@ -displayNames: matrix: image: - Dockerfile scenarios: - sbStress: - testTarget: servicebus \ No newline at end of file + queue: + testTarget: queue + aqueue: + testTarget: aqueue + queuepull: + testTarget: queuepull + aqueuepull: + testTarget: aqueuepull + batch: + testTarget: batch + abatch: + testTarget: abatch + queuew: + testTarget: queuew + aqueuew: + testTarget: aqueuew + queuepullw: + testTarget: queuepullw + aqueuepullw: + testTarget: aqueuepullw + batchw: + testTarget: batchw + abatchw: + testTarget: abatchw + memray: + testTarget: memray + amemray: + testTarget: amemray diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/dev_requirements.txt b/sdk/servicebus/azure-servicebus/stress/scripts/dev_requirements.txt index d7928a902bf4..dc6a5d000de9 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/dev_requirements.txt +++ b/sdk/servicebus/azure-servicebus/stress/scripts/dev_requirements.txt @@ -1,6 +1,6 @@ -aiohttp>=3.0; python_version >= '3.5' +aiohttp>=3.0 opencensus-ext-azure psutil -pytest azure-servicebus python-dotenv +websocket-client \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/logger.py b/sdk/servicebus/azure-servicebus/stress/scripts/logger.py index 0875c2e8a7e0..3199a5e59afb 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/logger.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/logger.py @@ -11,77 +11,37 @@ from opencensus.ext.azure.log_exporter import AzureLogHandler -def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None, +def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None, log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3): logger = logging.getLogger(logger_name) logger.setLevel(level) formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - - if print_console: - console_handler = logging.StreamHandler(stream=sys.stdout) - if not logger.handlers: - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - - if log_filename: - logger_file_handler = RotatingFileHandler( - log_filename, - maxBytes=log_file_max_bytes, - backupCount=log_file_backup_count - ) - logger_file_handler.setFormatter(formatter) - logger.addHandler(logger_file_handler) - + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) return logger - -def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None, +def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None, log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3): stress_logger = logging.getLogger(logger_name) stress_logger.setLevel(level) - eventhub_logger = logging.getLogger("azure.eventhub") - eventhub_logger.setLevel(level) - uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(level) + servicebus_logger = logging.getLogger("azure.servicebus") + servicebus_logger.setLevel(level) + pyamqp_logger = logging.getLogger("azure.servicebus._pyamqp") + pyamqp_logger.setLevel(level) formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') - if print_console: - console_handler = logging.StreamHandler(stream=sys.stdout) - console_handler.setFormatter(formatter) - if not eventhub_logger.handlers: - eventhub_logger.addHandler(console_handler) - if not uamqp_logger.handlers: - uamqp_logger.addHandler(console_handler) - if not stress_logger.handlers: - stress_logger.addHandler(console_handler) - if log_filename: - eventhub_file_handler = RotatingFileHandler( - "eventhub_" + log_filename, - maxBytes=log_file_max_bytes, - backupCount=log_file_backup_count - ) - uamqp_file_handler = RotatingFileHandler( - "uamqp_" + log_filename, - maxBytes=log_file_max_bytes, - backupCount=log_file_backup_count - ) - stress_file_handler = RotatingFileHandler( - log_filename, - maxBytes=log_file_max_bytes, - backupCount=log_file_backup_count - ) - eventhub_file_handler.setFormatter(formatter) - uamqp_file_handler.setFormatter(formatter) - stress_file_handler.setFormatter(formatter) - eventhub_logger.addHandler(eventhub_file_handler) - uamqp_logger.addHandler(uamqp_file_handler) - stress_logger.addHandler(stress_file_handler) + console_handler = logging.FileHandler(log_filename) + console_handler.setFormatter(formatter) + servicebus_logger.addHandler(console_handler) + pyamqp_logger.addHandler(console_handler) + stress_logger.addHandler(console_handler) return stress_logger -def get_azure_logger(logger_name, level=logging.INFO): +def get_azure_logger(logger_name, level=logging.ERROR): logger = logging.getLogger("azure_logger_" + logger_name) logger.setLevel(level) # oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING' diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/process_monitor.py b/sdk/servicebus/azure-servicebus/stress/scripts/process_monitor.py index b59610a4331f..8e8178c5f56b 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/process_monitor.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/process_monitor.py @@ -12,7 +12,7 @@ class ProcessMonitor: - def __init__(self, log_filename, logger_name, log_interval=5.0, print_console=False, + def __init__(self, log_filename, logger_name, log_interval=30.0, print_console=False, process_id=None, **kwargs): """ Process Monitor monitors the CPU usage, memory usage of a specific process. diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py b/sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py index c1fe97d8bca1..fac7c5c8ab0e 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py @@ -6,8 +6,10 @@ import os import asyncio +import configparser from argparse import ArgumentParser from datetime import timedelta +from dotenv import load_dotenv from azure.servicebus import ServiceBusClient from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient @@ -16,9 +18,7 @@ from app_insights_metric import AzureMonitorMetric from process_monitor import ProcessMonitor -CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] -QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] - +ENV_FILE = os.environ.get("ENV_FILE") def sync_send(client, args): azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender") @@ -53,6 +53,9 @@ async def async_send(client, args): def sync_receive(client, args): + config = configparser.ConfigParser() + config.read("./stress_runner.cfg") + azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver") process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync") stress_test = StressTestRunner( @@ -87,10 +90,14 @@ async def async_receive(client, args): if __name__ == '__main__': + load_dotenv(dotenv_path=ENV_FILE, override=True) parser = ArgumentParser() + parser.add_argument("--conn_str", help="ServiceBus connection string", + default=os.environ.get('SERVICE_BUS_CONNECTION_STR')) + parser.add_argument("--queue_name", help="The queue name.", default=os.environ.get("SERVICE_BUS_QUEUE_NAME")) parser.add_argument("--method", type=str) parser.add_argument("--duration", type=int, default=259200) - parser.add_argument("--logging-enable", action="store_true") + parser.add_argument("--logging_enable", action="store_true") parser.add_argument("--send-batch-size", type=int, default=100) parser.add_argument("--message-size", type=int, default=100) @@ -102,6 +109,9 @@ async def async_receive(client, args): args, _ = parser.parse_known_args() loop = asyncio.get_event_loop() + CONNECTION_STR = args.conn_str + QUEUE_NAME= args.queue_name + if args.method.startswith("sync"): sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) else: diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py b/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py index 4eda15b66126..67c016c9bfb3 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/stress_test_base.py @@ -9,6 +9,7 @@ from datetime import datetime, timedelta import concurrent import sys +import os import asyncio import logging @@ -20,15 +21,13 @@ from azure.servicebus import ServiceBusMessage, ServiceBusMessageBatch from azure.servicebus.exceptions import MessageAlreadySettled - -import logger +from logger import get_logger from app_insights_metric import AbstractMonitorMetric from process_monitor import ProcessMonitor -LOGFILE_NAME = "stress-test.log" +LOGFILE_NAME = os.environ.get("DEBUG_SHARE") + "output" PRINT_CONSOLE = True - -_logger = logger.get_base_logger(LOGFILE_NAME, "stress_test", logging.WARN) +_logger = get_logger(LOGFILE_NAME, "stress_test", logging.ERROR) class ReceiveType: @@ -44,11 +43,11 @@ def __init__(self): self.time_elapsed = None self.state_by_sender = {} self.state_by_receiver = {} + self.actual_size = 0 def __repr__(self): return str(vars(self)) - class StressTestRunnerState(object): """Per-runner state, e.g. if you spawn 3 senders each will have this as their state object, which will be coalesced at completion into StressTestResults""" @@ -64,11 +63,11 @@ def __init__(self): def __repr__(self): return str(vars(self)) - def populate_process_stats(self): + def populate_process_stats(self, monitor): self.timestamp = datetime.utcnow() try: self.cpu_percent = psutil.cpu_percent() - self.memory_bytes = psutil.virtual_memory().total + self.memory_bytes = psutil.virtual_memory().percent except NameError: return # psutil was not installed, fall back to simply not capturing these stats. @@ -81,22 +80,25 @@ def __init__( self, senders, receivers, + admin_client, duration=timedelta(minutes=15), receive_type=ReceiveType.push, send_batch_size=None, message_size=10, max_wait_time=10, - send_delay=0.01, + send_delay=1.0, receive_delay=0, should_complete_messages=True, - max_message_count=1, + max_message_count=10, send_session_id=None, fail_on_exception=True, azure_monitor_metric=None, process_monitor=None, + logging_level=logging.ERROR, ): self.senders = senders self.receivers = receivers + self.admin_client = admin_client self.duration = duration self.receive_type = receive_type self.message_size = message_size @@ -111,6 +113,7 @@ def __init__( self.azure_monitor_metric = azure_monitor_metric or AbstractMonitorMetric( "fake_test_name" ) + self.logging_level = logging_level self.process_monitor = process_monitor or ProcessMonitor( "monitor_{}".format(LOGFILE_NAME), "test_stress_queues", @@ -123,7 +126,7 @@ def __init__( self._duration_override = None for arg in sys.argv: - if arg.startswith("--stress_test_duration_seconds="): + if arg.startswith("--duration="): self._duration_override = timedelta(seconds=int(arg.split("=")[1])) self._should_stop = False @@ -161,10 +164,10 @@ def pre_process_message_body(self, payload): """Allows user to transform message payload before sending it.""" return payload - def _schedule_interval_logger(self, end_time, description="", interval_seconds=30): + def _schedule_interval_logger(self, end_time, description="", interval_seconds=300): def _do_interval_logging(): if end_time > datetime.utcnow() and not self._should_stop: - self._state.populate_process_stats() + self._state.populate_process_stats(self.process_monitor) _logger.critical( "{} RECURRENT STATUS: {}".format(description, self._state) ) @@ -194,14 +197,16 @@ def _construct_message(self): def _send(self, sender, end_time): self._schedule_interval_logger(end_time, "Sender " + str(self)) try: - _logger.info("STARTING SENDER") + _logger.debug("Starting send loop") + # log sender + _logger.debug("Sender: {}".format(sender)) with sender: while end_time > datetime.utcnow() and not self._should_stop: - _logger.info("SENDING") try: message = self._construct_message() if self.send_session_id != None: message.session_id = self.send_session_id + _logger.debug("Sending message: {}".format(message)) sender.send_messages(message) self.azure_monitor_metric.record_messages_cpu_memory( self.send_batch_size, @@ -213,6 +218,7 @@ def _send(self, sender, end_time): else: self._state.total_sent += 1 # send single message self.on_send(self._state, message, sender) + except Exception as e: _logger.exception("Exception during send: {}".format(e)) self.azure_monitor_metric.record_error(e) @@ -229,10 +235,11 @@ def _send(self, sender, end_time): def _receive(self, receiver, end_time): self._schedule_interval_logger(end_time, "Receiver " + str(self)) + # log receiver + _logger.debug("Receiver: {}".format(receiver)) try: with receiver: while end_time > datetime.utcnow() and not self._should_stop: - _logger.info("RECEIVE LOOP") try: if self.receive_type == ReceiveType.pull: batch = receiver.receive_messages( @@ -242,14 +249,19 @@ def _receive(self, receiver, end_time): elif self.receive_type == ReceiveType.push: receiver.max_wait_time = self.max_wait_time batch = receiver + # else: + # batch = [] for message in batch: + # log reciever + _logger.debug("Received message: {}".format(message)) self.on_receive(self._state, message, receiver) try: if self.should_complete_messages: receiver.complete_message(message) except MessageAlreadySettled: # It may have been settled in the plugin callback. pass + self._state.total_received += 1 # TODO: Get EnqueuedTimeUtc out of broker properties and calculate latency. Should properties/app properties be mostly None? if end_time <= datetime.utcnow(): @@ -267,7 +279,7 @@ def _receive(self, receiver, end_time): self.azure_monitor_metric.record_error(e) if self.fail_on_exception: raise - self._state.timestamp = datetime.utcnow() + self._state.timestamp = datetime.utcnow() return self._state except Exception as e: self.azure_monitor_metric.record_error(e) @@ -276,20 +288,29 @@ def _receive(self, receiver, end_time): raise def run(self): + start_time = datetime.utcnow() + if isinstance(self.duration, int): + self.duration = timedelta(seconds=self.duration) end_time = start_time + (self._duration_override or self.duration) + with self.process_monitor: with concurrent.futures.ThreadPoolExecutor(max_workers=4) as proc_pool: _logger.info("STARTING PROC POOL") - senders = [ - proc_pool.submit(self._send, sender, end_time) - for sender in self.senders - ] - receivers = [ - proc_pool.submit(self._receive, receiver, end_time) - for receiver in self.receivers - ] - + if self.senders: + senders = [ + proc_pool.submit(self._send, sender, end_time) + for sender in self.senders + ] + else: + senders = [] + if self.receivers: + receivers = [ + proc_pool.submit(self._receive, receiver, end_time) + for receiver in self.receivers + ] + else: + receivers = [] result = StressTestResults() for each in concurrent.futures.as_completed(senders + receivers): _logger.info("SOMETHING FINISHED") @@ -298,25 +319,28 @@ def run(self): if each in receivers: result.state_by_receiver[each] = each.result() # TODO: do as_completed in one batch to provide a way to short-circuit on failure. - result.state_by_sender = { - s: f.result() - for s, f in zip( - self.senders, concurrent.futures.as_completed(senders) + if self.senders: + result.state_by_sender = { + s: f.result() + for s, f in zip( + self.senders, concurrent.futures.as_completed(senders) + ) + } + _logger.info("Got receiver results") + result.total_sent = sum( + [r.total_sent for r in result.state_by_sender.values()] ) - } - result.state_by_receiver = { - r: f.result() - for r, f in zip( - self.receivers, concurrent.futures.as_completed(receivers) + if self.receivers: + result.state_by_receiver = { + r: f.result() + for r, f in zip( + self.receivers, concurrent.futures.as_completed(receivers) + ) + } + + result.total_received = sum( + [r.total_received for r in result.state_by_receiver.values()] ) - } - _logger.info("got receiver results") - result.total_sent = sum( - [r.total_sent for r in result.state_by_sender.values()] - ) - result.total_received = sum( - [r.total_received for r in result.state_by_receiver.values()] - ) result.time_elapsed = end_time - start_time _logger.critical("Stress test completed. Results:\n{}".format(result)) return result @@ -328,11 +352,12 @@ def __init__( senders, receivers, duration=timedelta(minutes=15), + admin_client=None, receive_type=ReceiveType.push, send_batch_size=None, message_size=10, max_wait_time=10, - send_delay=0.01, + send_delay=1.00, receive_delay=0, should_complete_messages=True, max_message_count=1, @@ -340,11 +365,13 @@ def __init__( fail_on_exception=True, azure_monitor_metric=None, process_monitor=None, + logging_level=logging.ERROR, ): super(StressTestRunnerAsync, self).__init__( senders, receivers, duration=duration, + admin_client=admin_client, receive_type=receive_type, send_batch_size=send_batch_size, message_size=message_size, @@ -357,15 +384,14 @@ def __init__( fail_on_exception=fail_on_exception, azure_monitor_metric=azure_monitor_metric, process_monitor=process_monitor, + logging_level=logging_level ) async def _send_async(self, sender, end_time): self._schedule_interval_logger(end_time, "Sender " + str(self)) try: - _logger.info("STARTING SENDER") async with sender: while end_time > datetime.utcnow() and not self._should_stop: - _logger.info("SENDING") try: message = self._construct_message() if self.send_session_id != None: @@ -376,7 +402,10 @@ async def _send_async(self, sender, end_time): self.process_monitor.cpu_usage_percent, self.process_monitor.memory_usage_percent, ) - self._state.total_sent += self.send_batch_size + if self.send_batch_size: + self._state.total_sent += self.send_batch_size + else: + self._state.total_sent += 1 self.on_send(self._state, message, sender) except Exception as e: _logger.exception("Exception during send: {}".format(e)) @@ -413,7 +442,6 @@ async def _receive_async(self, receiver, end_time): try: async with receiver: while end_time > datetime.utcnow() and not self._should_stop: - _logger.info("RECEIVE LOOP") try: if self.receive_type == ReceiveType.pull: batch = await receiver.receive_messages( @@ -450,31 +478,47 @@ async def _receive_async(self, receiver, end_time): async def run_async(self): start_time = datetime.utcnow() + if isinstance(self.duration, int): + self.duration = timedelta(seconds=self.duration) end_time = start_time + (self._duration_override or self.duration) - send_tasks = [ - asyncio.create_task(self._send_async(sender, end_time)) - for sender in self.senders - ] - receive_tasks = [ - asyncio.create_task(self._receive_async(receiver, end_time)) - for receiver in self.receivers - ] + if self.senders: + send_tasks = [ + asyncio.create_task(self._send_async(sender, end_time)) + for sender in self.senders + ] + else: + send_tasks = [] + if self.receivers: + receive_tasks = [ + asyncio.create_task(self._receive_async(receiver, end_time)) + for receiver in self.receivers + ] + else: + receive_tasks = [] with self.process_monitor: - await asyncio.gather(*send_tasks, *receive_tasks) + # await asyncio.gather(*send_tasks, *receive_tasks) + for task in asyncio.as_completed(send_tasks + receive_tasks): + try: + await task + except Exception as e: + print(e) result = StressTestResults() - result.state_by_sender = { - s: f.result() for s, f in zip(self.senders, send_tasks) - } - result.state_by_receiver = { - r: f.result() for r, f in zip(self.receivers, receive_tasks) - } - _logger.info("got receiver results") - result.total_sent = sum( + if self.senders: + result.state_by_sender = { + s: f.result() for s, f in zip(self.senders, send_tasks) + } + result.total_sent = sum( [r.total_sent for r in result.state_by_sender.values()] - ) - result.total_received = sum( - [r.total_received for r in result.state_by_receiver.values()] - ) + ) + if self.receivers: + result.state_by_receiver = { + r: f.result() for r, f in zip(self.receivers, receive_tasks) + } + _logger.info("got receiver results") + + result.total_received = sum( + [r.total_received for r in result.state_by_receiver.values()] + ) result.time_elapsed = end_time - start_time _logger.critical("Stress test completed. Results:\n{}".format(result)) return result diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues.py b/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues.py index 838bafbc4625..6288696d6cd0 100644 --- a/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues.py +++ b/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues.py @@ -1,147 +1,138 @@ -#------------------------------------------------------------------------- +#------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- from datetime import timedelta +import logging import time import os -import pytest from dotenv import load_dotenv -#from argparse import ArgumentParser +from argparse import ArgumentParser -from azure.servicebus import AutoLockRenewer, ServiceBusClient +from azure.servicebus import AutoLockRenewer, ServiceBusClient, TransportType +from azure.servicebus.management import ServiceBusAdministrationClient from azure.servicebus._common.constants import ServiceBusReceiveMode from app_insights_metric import AzureMonitorMetric from stress_test_base import StressTestRunner, ReceiveType ENV_FILE = os.environ.get('ENV_FILE') -load_dotenv(dotenv_path=ENV_FILE, override=True) -LOGGING_ENABLE = False -SERVICE_BUS_CONNECTION_STR = os.environ.get('SERVICE_BUS_CONNECTION_STR') -SERVICEBUS_QUEUE_NAME = os.environ.get('SERVICE_BUS_QUEUE_NAME') -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_send_and_receive(): +def test_stress_queue_send_and_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration=timedelta(seconds=60), - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_receive") + admin_client = sb_admin_client, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") - -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_send_and_pull_receive(): +def test_stress_queue_send_and_pull_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], + admin_client = sb_admin_client, receive_type=ReceiveType.pull, - duration=timedelta(seconds=60), - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_pull_receive") + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_pull_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) - + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_batch_send_and_receive(): +def test_stress_queue_batch_send_and_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], - receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration=timedelta(seconds=60), + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, prefetch_count=5)], + admin_client = sb_admin_client, + duration=args.duration, send_batch_size=5, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_batch_send_and_receive") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_batch_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") - -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_slow_send_and_receive(): +def test_stress_queue_slow_send_and_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration=timedelta(seconds=3501*3), - send_delay=3500, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive") + admin_client = sb_admin_client, + duration=args.duration, + send_delay=(args.duration/3), + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) - + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_receive_and_delete(): +def test_stress_queue_receive_and_delete(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE)], + admin_client = sb_admin_client, should_complete_messages = False, - duration=timedelta(seconds=60), - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive") + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") - -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_unsettled_messages(): +def test_stress_queue_unsettled_messages(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration = timedelta(seconds=350), + admin_client = sb_admin_client, + duration=args.duration, should_complete_messages = False, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_unsettled_messages") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_unsettled_messages"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() # This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache # and prevent further receipt. - assert(result.total_sent > 2500) - assert(result.total_received > 2500) - + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_receive_large_batch_size(): +def test_stress_queue_receive_large_batch_size(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = StressTestRunner(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, prefetch_count=50)], - duration = timedelta(seconds=60), + admin_client = sb_admin_client, + duration = args.duration, max_message_count = 50, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_receive_large_batch_size") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_receive_large_batch_size"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") # Cannot be defined at local scope due to pickling into multiproc runner. class ReceiverTimeoutStressTestRunner(StressTestRunner): @@ -151,24 +142,23 @@ def on_send(self, state, sent_message, sender): # To make receive time out, in push mode this delay would trigger receiver reconnection time.sleep(self.max_wait_time + 5) -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_pull_receive_timeout(): +def test_stress_queue_pull_receive_timeout(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = ReceiverTimeoutStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], + admin_client = sb_admin_client, max_wait_time = 5, receive_type=ReceiveType.pull, - duration=timedelta(seconds=600), - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_pull_receive_timeout") + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_pull_receive_timeout"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) - + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") class LongRenewStressTestRunner(StressTestRunner): def on_receive(self, state, received_message, receiver): @@ -177,23 +167,22 @@ def on_receive(self, state, received_message, receiver): renewer.register(receiver, received_message, max_lock_renewal_duration=300) time.sleep(300) -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_long_renew_send_and_receive(): +def test_stress_queue_long_renew_send_and_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = LongRenewStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration=timedelta(seconds=3000), + admin_client = sb_admin_client, + duration=args.duration, send_delay=300, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_send_and_receive") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) - + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") class LongSessionRenewStressTestRunner(StressTestRunner): def on_receive(self, state, received_message, receiver): @@ -203,25 +192,25 @@ def on_fail(renewable, error): print("FAILED AUTOLOCKRENEW: " + str(error)) renewer.register(receiver, receiver.session, max_lock_renewal_duration=600, on_lock_renew_failure=on_fail) -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_long_renew_session_send_and_receive(): +def test_stress_queue_long_renew_session_send_and_receive(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) session_id = 'test_stress_queue_long_renew_send_and_receive' stress_test = LongSessionRenewStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, session_id=session_id)], - duration=timedelta(seconds=3000), + admin_client = sb_admin_client, + duration=args.duration, send_delay=300, send_session_id=session_id, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_session_send_and_receive") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_session_send_and_receive"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") class Peekon_receiveStressTestRunner(StressTestRunner): @@ -229,22 +218,23 @@ def on_receive_batch(self, state, received_message, receiver): '''Called on every successful receive''' assert receiver.peek_messages()[0] -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_peek_messages(): +def test_stress_queue_peek_messages(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = Peekon_receiveStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration = timedelta(seconds=300), + admin_client = sb_admin_client, + duration=args.duration, receive_delay = 30, receive_type = ReceiveType.none, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_peek_messages") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_peek_messages"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") # TODO: This merits better validation, to be implemented alongside full metric spread. @@ -261,24 +251,23 @@ def on_send(self, state, sent_message, sender): sender.__exit__() sender.__enter__() -@pytest.mark.liveTest -@pytest.mark.live_test_only -@pytest.mark.skip(reason='This test is disabled unless re-openability of handlers is desired and re-enabled') -def test_stress_queue_close_and_reopen(): +def test_stress_queue_close_and_reopen(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = RestartHandlerStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], - duration = timedelta(seconds=300), + admin_client = sb_admin_client, + duration = args.duration, receive_delay = 30, send_delay = 10, - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_close_and_reopen") + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_close_and_reopen"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") # This test validates that all individual messages are received contiguously over a long time period. # (e.g. not dropped for whatever reason, not sent, or not received) @@ -310,23 +299,96 @@ def pre_process_message_body(self, payload): return str(body) -@pytest.mark.liveTest -@pytest.mark.live_test_only -def test_stress_queue_check_for_dropped_messages(): +def test_stress_queue_check_for_dropped_messages(args): sb_client = ServiceBusClient.from_connection_string( - SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE) + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) stress_test = DroppedMessageCheckerStressTestRunner( senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME)], + admin_client = sb_admin_client, receive_type=ReceiveType.pull, - duration=timedelta(seconds=3000), - azure_monitor_metric=AzureMonitorMetric("test_stress_queue_check_for_dropped_messages") + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_check_for_dropped_messages"), + logging_level=LOGGING_LEVEL ) result = stress_test.run() - assert(result.total_sent > 0) - assert(result.total_received > 0) + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") if __name__ == '__main__': - #parser = ArgumentParser() - pytest.main() + load_dotenv(dotenv_path=ENV_FILE, override=True) + parser = ArgumentParser() + parser.add_argument("--conn_str", help="ServiceBus connection string", + default=os.environ.get('SERVICE_BUS_CONNECTION_STR')) + parser.add_argument("--queue_name", help="The queue name.", default="testQueue") + parser.add_argument("--method", type=str) + parser.add_argument("--duration", type=int, default=259200) + parser.add_argument("--logging-enable", action="store_true") + parser.add_argument("--print_console", action="store_true") + + parser.add_argument("--send-batch-size", type=int, default=100) + parser.add_argument("--message-size", type=int, default=100) + + parser.add_argument("--receive-type", type=str, default="pull") + parser.add_argument("--max_wait_time", type=int, default=10) + parser.add_argument("--max_message_count", type=int, default=1) + parser.add_argument("--uamqp_mode", action="store_true") + parser.add_argument("--transport", action="store_true") + parser.add_argument("--debug_level", help="Flag for setting a debug level, can be Info, Debug, Warning, Error or Critical", type=str, default="Error") + + args, _ = parser.parse_known_args() + + if args.transport: + TRANSPORT_TYPE= TransportType.AmqpOverWebsocket + else: + TRANSPORT_TYPE= TransportType.Amqp + + SERVICE_BUS_CONNECTION_STR = args.conn_str + SERVICEBUS_QUEUE_NAME= args.queue_name + LOGGING_ENABLE = args.logging_enable + LOGGING_LEVEL = getattr(logging, args.debug_level.upper(), None) + + sb_admin_client = ServiceBusAdministrationClient.from_connection_string(SERVICE_BUS_CONNECTION_STR) + + if args.method == "send_receive": + test_stress_queue_send_and_receive(args) + elif args.method == "send_pull_receive": + test_stress_queue_send_and_pull_receive(args) + elif args.method == "send_receive_batch": + test_stress_queue_batch_send_and_receive(args) + elif args.method == "send_receive_slow": + test_stress_queue_slow_send_and_receive(args) + elif args.method == "receive_delete": + test_stress_queue_receive_and_delete(args) + elif args.method == "unsettled_message": + test_stress_queue_unsettled_messages(args) + elif args.method == "large_batch": + test_stress_queue_receive_large_batch_size(args) + elif args.method == "pull_receive_timeout": + test_stress_queue_pull_receive_timeout(args) + elif args.method == "long_renew": + test_stress_queue_long_renew_send_and_receive(args) + elif args.method == "long_renew_session": + test_stress_queue_long_renew_session_send_and_receive(args) + elif args.method == "queue_peek": + test_stress_queue_peek_messages(args) + elif args.method == "queue_close_reopen": + test_stress_queue_close_and_reopen(args) + elif args.method == "dropped_messages": + test_stress_queue_check_for_dropped_messages(args) + else: + test_stress_queue_send_and_receive(args) + test_stress_queue_send_and_pull_receive(args) + test_stress_queue_batch_send_and_receive(args) + test_stress_queue_slow_send_and_receive(args) + test_stress_queue_receive_and_delete(args) + test_stress_queue_unsettled_messages(args) + test_stress_queue_receive_large_batch_size(args) + test_stress_queue_pull_receive_timeout(args) + test_stress_queue_long_renew_send_and_receive(args) + test_stress_queue_long_renew_session_send_and_receive(args) + test_stress_queue_peek_messages(args) + test_stress_queue_close_and_reopen(args) + test_stress_queue_check_for_dropped_messages(args) + diff --git a/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues_async.py b/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues_async.py new file mode 100644 index 000000000000..eb9f05765cf2 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/stress/scripts/test_stress_queues_async.py @@ -0,0 +1,399 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +from datetime import timedelta +import logging +import time +import os +import asyncio +from dotenv import load_dotenv +from argparse import ArgumentParser + +from azure.servicebus import AutoLockRenewer, TransportType +from azure.servicebus.aio import ServiceBusClient +from azure.servicebus.aio.management import ServiceBusAdministrationClient +from azure.servicebus._common.constants import ServiceBusReceiveMode +from app_insights_metric import AzureMonitorMetric + +from stress_test_base import StressTestRunnerAsync, ReceiveType + +ENV_FILE = os.environ.get('ENV_FILE') + + +async def test_stress_queue_send_and_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_send_and_pull_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + receive_type=ReceiveType.pull, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_send_and_pull_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_batch_send_and_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, prefetch_count=5, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + send_batch_size=5, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_batch_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_slow_send_and_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + send_delay=(args.duration/3), + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_receive_and_delete(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE, max_wait_time=10)], + admin_client = sb_admin_client, + should_complete_messages = False, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_slow_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_unsettled_messages(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + should_complete_messages = False, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_unsettled_messages"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + # This test is prompted by reports of an issue where enough unsettled messages saturate a service-side cache + # and prevent further receipt. + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def test_stress_queue_receive_large_batch_size(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = StressTestRunnerAsync(senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, prefetch_count=50, max_wait_time=10)], + admin_client = sb_admin_client, + duration = args.duration, + max_message_count = 50, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_receive_large_batch_size"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +# Cannot be async defined at local scope due to pickling into multiproc runner. +class ReceiverTimeoutStressTestRunner(StressTestRunnerAsync): + def on_send(self, state, sent_message, sender): + '''Called on every successful send''' + if state.total_sent % 10 == 0: + # To make receive time out, in push mode this delay would trigger receiver reconnection + time.sleep(self.max_wait_time + 5) + +async def test_stress_queue_pull_receive_timeout(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = ReceiverTimeoutStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + max_wait_time = 5, + receive_type=ReceiveType.pull, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_pull_receive_timeout"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +class LongRenewStressTestRunner(StressTestRunnerAsync): + def on_receive(self, state, received_message, receiver): + '''Called on every successful receive''' + renewer = AutoLockRenewer() + renewer.register(receiver, received_message, max_lock_renewal_duration=300) + time.sleep(300) + +async def test_stress_queue_long_renew_send_and_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = LongRenewStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + send_delay=300, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +class LongSessionRenewStressTestRunner(StressTestRunnerAsync): + def on_receive(self, state, received_message, receiver): + '''Called on every successful receive''' + renewer = AutoLockRenewer() + def on_fail(renewable, error): + print("FAILED AUTOLOCKRENEW: " + str(error)) + renewer.register(receiver, receiver.session, max_lock_renewal_duration=600, on_lock_renew_failure=on_fail) + +async def test_stress_queue_long_renew_session_send_and_receive(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + session_id = 'test_stress_queue_long_renew_send_and_receive' + + stress_test = LongSessionRenewStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, session_id=session_id, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + send_delay=300, + send_session_id=session_id, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_long_renew_session_send_and_receive"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +class Peekon_receiveStressTestRunner(StressTestRunnerAsync): + def on_receive_batch(self, state, received_message, receiver): + '''Called on every successful receive''' + assert receiver.peek_messages()[0] + +async def test_stress_queue_peek_messages(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = Peekon_receiveStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration=args.duration, + receive_delay = 30, + receive_type = ReceiveType.none, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_peek_messages"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + # TODO: This merits better validation, to be implemented alongside full metric spread. + + +class RestartHandlerStressTestRunner(StressTestRunnerAsync): + def post_receive(self, state, receiver): + '''Called after completion of every successful receive''' + if state.total_received % 3 == 0: + receiver.__exit__() + receiver.__enter__() + + def on_send(self, state, sent_message, sender): + '''Called after completion of every successful receive''' + if state.total_sent % 3 == 0: + sender.__exit__() + sender.__enter__() + +async def test_stress_queue_close_and_reopen(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = RestartHandlerStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + duration = args.duration, + receive_delay = 30, + send_delay = 10, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_close_and_reopen"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +# This test validates that all individual messages are received contiguously over a long time period. +# (e.g. not dropped for whatever reason, not sent, or not received) +class DroppedMessageCheckerStressTestRunner(StressTestRunnerAsync): + def on_receive(self, state, received_message, receiver): + '''Called on every successful receive''' + last_seen = getattr(state, 'last_seen', -1) + noncontiguous = getattr(state, 'noncontiguous', set()) + body = int(str(received_message)) + if body == last_seen+1: + last_seen += 1 + if noncontiguous: + while (last_seen+1) in noncontiguous: + last_seen += 1 + noncontiguous.remove(last_seen) + else: + noncontiguous.add(body) + state.noncontiguous = noncontiguous + state.last_seen = last_seen + + def pre_process_message_body(self, payload): + '''Called when constructing message body''' + try: + body = self._message_id + except: + _message_id = 0 + body = 0 + _message_id += 1 + + return str(body) + +async def test_stress_queue_check_for_dropped_messages(args): + sb_client = ServiceBusClient.from_connection_string( + SERVICE_BUS_CONNECTION_STR, logging_enable=LOGGING_ENABLE, transport_type=TRANSPORT_TYPE) + stress_test = DroppedMessageCheckerStressTestRunner( + senders = [sb_client.get_queue_sender(SERVICEBUS_QUEUE_NAME)], + receivers = [sb_client.get_queue_receiver(SERVICEBUS_QUEUE_NAME, max_wait_time=10)], + admin_client = sb_admin_client, + receive_type=ReceiveType.pull, + duration=args.duration, + azure_monitor_metric=AzureMonitorMetric("test_stress_queue_check_for_dropped_messages"), + logging_level=LOGGING_LEVEL + ) + + result = await stress_test.run_async() + print(f"Total send {result.total_sent}") + print(f"Total received {result.total_received}") + +async def run(args): + if args.method == "send_receive": + await test_stress_queue_send_and_receive(args) + elif args.method == "send_pull_receive": + await test_stress_queue_send_and_pull_receive(args) + elif args.method == "send_receive_batch": + await test_stress_queue_batch_send_and_receive(args) + elif args.method == "send_receive_slow": + await test_stress_queue_slow_send_and_receive(args) + elif args.method == "receive_delete": + await test_stress_queue_receive_and_delete(args) + elif args.method == "unsettled_message": + await test_stress_queue_unsettled_messages(args) + elif args.method == "large_batch": + await test_stress_queue_receive_large_batch_size(args) + elif args.method == "pull_receive_timeout": + await test_stress_queue_pull_receive_timeout(args) + elif args.method == "long_renew": + await test_stress_queue_long_renew_send_and_receive(args) + elif args.method == "long_renew_session": + await test_stress_queue_long_renew_session_send_and_receive(args) + elif args.method == "queue_peek": + await test_stress_queue_peek_messages(args) + elif args.method == "queue_close_reopen": + await test_stress_queue_close_and_reopen(args) + elif args.method == "dropped_messages": + await test_stress_queue_check_for_dropped_messages(args) + else: + await test_stress_queue_send_and_receive(args) + await test_stress_queue_send_and_pull_receive(args) + await test_stress_queue_batch_send_and_receive(args) + await test_stress_queue_slow_send_and_receive(args) + await test_stress_queue_receive_and_delete(args) + await test_stress_queue_unsettled_messages(args) + await test_stress_queue_receive_large_batch_size(args) + await test_stress_queue_pull_receive_timeout(args) + await test_stress_queue_long_renew_send_and_receive(args) + await test_stress_queue_long_renew_session_send_and_receive(args) + await test_stress_queue_peek_messages(args) + await test_stress_queue_close_and_reopen(args) + await test_stress_queue_check_for_dropped_messages(args) + + +if __name__ == '__main__': + load_dotenv(dotenv_path=ENV_FILE, override=True) + parser = ArgumentParser() + parser.add_argument("--conn_str", help="ServiceBus connection string", + default=os.environ.get('SERVICE_BUS_CONNECTION_STR')) + parser.add_argument("--queue_name", help="The queue name.", default='testQueue') + parser.add_argument("--method", type=str) + parser.add_argument("--duration", type=int, default=259200) + parser.add_argument("--logging-enable", action="store_true") + parser.add_argument("--print_console", action="store_true") + + parser.add_argument("--send-batch-size", type=int, default=100) + parser.add_argument("--message-size", type=int, default=100) + + parser.add_argument("--receive-type", type=str, default="pull") + parser.add_argument("--max_wait_time", type=int, default=10) + parser.add_argument("--max_message_count", type=int, default=1) + parser.add_argument("--uamqp_mode", type=bool, default=False) + parser.add_argument("--transport", action="store_true") + parser.add_argument("--debug_level", help="Flag for setting a debug level, can be Info, Debug, Warning, Error or Critical", type=str, default="Error") + args, _ = parser.parse_known_args() + + if args.transport: + TRANSPORT_TYPE= TransportType.AmqpOverWebsocket + else: + TRANSPORT_TYPE= TransportType.Amqp + + SERVICE_BUS_CONNECTION_STR = args.conn_str + SERVICEBUS_QUEUE_NAME= args.queue_name + LOGGING_ENABLE = args.logging_enable + LOGGING_LEVEL = getattr(logging, args.debug_level.upper(), None) + + sb_admin_client = ServiceBusAdministrationClient.from_connection_string(SERVICE_BUS_CONNECTION_STR) + loop = asyncio.get_event_loop() + loop.run_until_complete(run(args)) + + diff --git a/sdk/servicebus/azure-servicebus/stress/templates/network_loss.yaml b/sdk/servicebus/azure-servicebus/stress/templates/network_loss.yaml deleted file mode 100644 index a1e64b80ab9c..000000000000 --- a/sdk/servicebus/azure-servicebus/stress/templates/network_loss.yaml +++ /dev/null @@ -1,25 +0,0 @@ -{{- include "stress-test-addons.chaos-wrapper.tpl" (list . "stress.python-sb-network") -}} -{{- define "stress.python-sb-network" -}} -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -spec: - scheduler: - cron: '@every 30s' - duration: '10s' - action: loss - direction: to - externalTargets: - - '{{ .Stress.ResourceGroupName }}.servicebus.windows.net' - mode: one - selector: - labelSelectors: - testInstance: "servicebus-{{ .Release.Name }}-{{ .Release.Revision }}" - chaos: 'true' - namespaces: - - {{ .Release.Namespace }} - podPhaseSelectors: - - 'Running' - loss: - loss: '100' - correlation: '100' -{{- end -}} \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/stress/templates/testjob.yaml b/sdk/servicebus/azure-servicebus/stress/templates/testjob.yaml index b673fae60587..9d46259360de 100644 --- a/sdk/servicebus/azure-servicebus/stress/templates/testjob.yaml +++ b/sdk/servicebus/azure-servicebus/stress/templates/testjob.yaml @@ -2,9 +2,9 @@ {{- define "stress.python-sb-stress" -}} metadata: labels: - testName: "deploy-python-sb-stress" - testInstance: "servicebus-{{ .Release.Name }}-{{ .Release.Revision }}" - chaos: "true" + testName: "py-sb-stress" + testInstance: "sb-{{ .Release.Name }}-{{ .Release.Revision }}" + chaos: "{{ default false .Stress.chaos }}" spec: containers: - name: python-sb-stress @@ -15,9 +15,63 @@ spec: memory: "2000Mi" cpu: "1" - {{ if eq .Stress.testTarget "sbStress" }} - command: ['bash', '-c', 'python3 test_stress_queues.py'] + {{ if eq .Stress.testTarget "aqueuew" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output && python test_stress_queues.py --method send_receive --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "queuew" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output && python test_stress_queues.py --method send_receive --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "aqueuepullw" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues_async.py --method send_pull_receive --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "queuepullw" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues.py --method send_pull_receive --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "abatchw" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues_async.py --method send_receive_batch --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "batchw" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues.py --method send_receive_batch --duration 300000 --logging-enable --transport'] + {{- end -}} + + {{ if eq .Stress.testTarget "aqueue" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues_async.py --method send_receive --duration 300000 --logging-enable'] + {{- end -}} + + {{ if eq .Stress.testTarget "queue" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues.py --method send_receive --duration 300000 --logging-enable'] + {{- end -}} + + {{ if eq .Stress.testTarget "aqueuepull" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues_async.py --method send_pull_receive --duration 300000 --logging-enable'] + {{- end -}} + + {{ if eq .Stress.testTarget "queuepull" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues.py --method send_pull_receive --duration 300000 --logging-enable --output $DEBUG_SHARE/output.bin'] + {{- end -}} + + {{ if eq .Stress.testTarget "abatch" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues_async.py --method send_receive_batch --duration 300000 --logging-enable --output $DEBUG_SHARE/output.bin'] + {{- end -}} + + {{ if eq .Stress.testTarget "batch" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && cat > $DEBUG_SHARE/output.bin && python test_stress_queues.py --method send_receive_batch --duration 300000 --logging-enable --output $DEBUG_SHARE/output.bin'] + {{- end -}} + + {{ if eq .Stress.testTarget "amemray" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_async_memray_output.bin test_stress_queues_async.py --method send_pull_receive --duration 300000 --logging-enable'] + {{- end -}} + + {{ if eq .Stress.testTarget "memray" }} + command: ['bash', '-c', 'mkdir -p $DEBUG_SHARE && memray run --output $DEBUG_SHARE/sb_memray_output.bin test_stress_queues.py --method send_pull_receive --duration 300000 --logging-enable'] {{- end -}} {{- include "stress-test-addons.container-env" . | nindent 6 }} {{- end -}} + + diff --git a/sdk/servicebus/azure-servicebus/stress/test-resources.bicep b/sdk/servicebus/azure-servicebus/stress/test-resources.bicep new file mode 100644 index 000000000000..2bc46e309699 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/stress/test-resources.bicep @@ -0,0 +1,120 @@ +@description('The base resource name.') +param baseName string = resourceGroup().name + +@description('The client OID to grant access to test resources.') +param testApplicationOid string + +var apiVersion = '2017-04-01' +var location = resourceGroup().location +var authorizationRuleName_var = '${baseName}/RootManageSharedAccessKey' +var authorizationRuleNameNoManage_var = '${baseName}/NoManage' +var serviceBusDataOwnerRoleId = '/subscriptions/${subscription().subscriptionId}/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419' + +var sbPremiumName = 'sb-premium-${baseName}' + +resource servicebus 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { + name: baseName + location: location + sku: { + name: 'Standard' + tier: 'Standard' + } + properties: { + zoneRedundant: false + } +} + +resource servicebusPremium 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = { + name: sbPremiumName + location: location + sku: { + name: 'Premium' + tier: 'Premium' + } +} + + +resource authorizationRuleName 'Microsoft.ServiceBus/namespaces/AuthorizationRules@2015-08-01' = { + name: authorizationRuleName_var + location: location + properties: { + rights: [ + 'Listen' + 'Manage' + 'Send' + ] + } + dependsOn: [ + servicebus + ] +} + +resource authorizationRuleNameNoManage 'Microsoft.ServiceBus/namespaces/AuthorizationRules@2015-08-01' = { + name: authorizationRuleNameNoManage_var + location: location + properties: { + rights: [ + 'Listen' + 'Send' + ] + } + dependsOn: [ + servicebus + ] +} + + + +resource dataOwnerRoleId 'Microsoft.Authorization/roleAssignments@2018-01-01-preview' = { + name: guid('dataOwnerRoleId${baseName}') + properties: { + roleDefinitionId: serviceBusDataOwnerRoleId + principalId: testApplicationOid + } + dependsOn: [ + servicebus + ] +} + +resource testQueue 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { + parent: servicebus + name: 'testQueue' + properties: { + lockDuration: 'PT5M' + maxSizeInMegabytes: 1024 + requiresDuplicateDetection: false + requiresSession: false + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + deadLetteringOnMessageExpiration: false + duplicateDetectionHistoryTimeWindow: 'PT10M' + maxDeliveryCount: 10 + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + enablePartitioning: false + enableExpress: false + } +} + +resource testQueueWithSessions 'Microsoft.ServiceBus/namespaces/queues@2017-04-01' = { + parent: servicebus + name: 'testQueueWithSessions' + properties: { + lockDuration: 'PT5M' + maxSizeInMegabytes: 1024 + requiresDuplicateDetection: false + requiresSession: true + defaultMessageTimeToLive: 'P10675199DT2H48M5.4775807S' + deadLetteringOnMessageExpiration: false + duplicateDetectionHistoryTimeWindow: 'PT10M' + maxDeliveryCount: 10 + autoDeleteOnIdle: 'P10675199DT2H48M5.4775807S' + enablePartitioning: false + enableExpress: false + } +} + +output SERVICEBUS_CONNECTION_STRING string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString +output SERVICEBUS_CONNECTION_STRING_NO_MANAGE string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', baseName, 'NoManage'), apiVersion).primaryConnectionString +output SERVICEBUS_CONNECTION_STRING_PREMIUM string = listKeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', sbPremiumName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString +output SERVICEBUS_ENDPOINT string = replace(replace(servicebus.properties.serviceBusEndpoint, ':443/', ''), 'https://', '') +output QUEUE_NAME string = 'testQueue' +output QUEUE_NAME_WITH_SESSIONS string = 'testQueueWithSessions'