Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SB Pyamqp] stress updates #29783

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/stress/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
stress
stress.exe
.env
Dockerfile
*.py
*.txt
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/stress/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.2.0
digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0
generated: "2022-11-19T01:30:02.403917379Z"
digest: sha256:53cbe4c0fed047f6c611523bd34181b21a310e7a3a21cb14f649bb09e4a77648
generated: "2023-03-14T09:57:20.6731895-07:00"
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/stress/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: v2
name: python-servicebus-stress-test
name: py-sb-stress-test
description: python service bus stress test.
version: 0.1.2
appVersion: v0.2
Expand All @@ -9,5 +9,5 @@ annotations:

dependencies:
- name: stress-test-addons
version: ~0.2.0
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.2.0
repository: "@stress-test-charts"
4 changes: 3 additions & 1 deletion sdk/servicebus/azure-servicebus/stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# public OSS users should simply leave this argument blank or ignore its presence entirely
ARG REGISTRY="mcr.microsoft.com/mirror/docker/library/"
FROM ${REGISTRY}python:3.8-slim-buster
# Install if running off git branch
# RUN apt-get -y update && apt-get -y install git

WORKDIR /app

COPY ./scripts /app/stress/scripts

WORKDIR /app/stress/scripts
RUN pip3 install -r dev_requirements.txt
RUN pip install -r dev_requirements.txt
31 changes: 28 additions & 3 deletions sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
displayNames:
matrix:
image:
- Dockerfile
scenarios:
sbStress:
testTarget: servicebus
queue:
testTarget: queue
aqueue:
testTarget: aqueue
queuepull:
testTarget: queuepull
aqueuepull:
testTarget: aqueuepull
batch:
testTarget: batch
abatch:
testTarget: abatch
queuew:
testTarget: queuew
aqueuew:
testTarget: aqueuew
queuepullw:
testTarget: queuepullw
aqueuepullw:
testTarget: aqueuepullw
batchw:
testTarget: batchw
abatchw:
testTarget: abatchw
memray:
testTarget: memray
kashifkhan marked this conversation as resolved.
Show resolved Hide resolved
amemray:
testTarget: amemray
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
aiohttp>=3.0; python_version >= '3.5'
aiohttp>=3.0
opencensus-ext-azure
psutil
pytest
azure-servicebus
python-dotenv
websocket-client
70 changes: 15 additions & 55 deletions sdk/servicebus/azure-servicebus/stress/scripts/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,77 +11,37 @@
from opencensus.ext.azure.log_exporter import AzureLogHandler


def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
logger = logging.getLogger(logger_name)
logger.setLevel(level)
formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
if not logger.handlers:
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

if log_filename:
logger_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
logger_file_handler.setFormatter(formatter)
logger.addHandler(logger_file_handler)

console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger


def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
stress_logger = logging.getLogger(logger_name)
stress_logger.setLevel(level)
eventhub_logger = logging.getLogger("azure.eventhub")
eventhub_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(level)
servicebus_logger = logging.getLogger("azure.servicebus")
servicebus_logger.setLevel(level)
pyamqp_logger = logging.getLogger("azure.servicebus._pyamqp")
pyamqp_logger.setLevel(level)

formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
if not eventhub_logger.handlers:
eventhub_logger.addHandler(console_handler)
if not uamqp_logger.handlers:
uamqp_logger.addHandler(console_handler)
if not stress_logger.handlers:
stress_logger.addHandler(console_handler)

if log_filename:
eventhub_file_handler = RotatingFileHandler(
"eventhub_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
uamqp_file_handler = RotatingFileHandler(
"uamqp_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
stress_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
eventhub_file_handler.setFormatter(formatter)
uamqp_file_handler.setFormatter(formatter)
stress_file_handler.setFormatter(formatter)
eventhub_logger.addHandler(eventhub_file_handler)
uamqp_logger.addHandler(uamqp_file_handler)
stress_logger.addHandler(stress_file_handler)
console_handler = logging.FileHandler(log_filename)
console_handler.setFormatter(formatter)
servicebus_logger.addHandler(console_handler)
pyamqp_logger.addHandler(console_handler)
stress_logger.addHandler(console_handler)

return stress_logger


def get_azure_logger(logger_name, level=logging.INFO):
def get_azure_logger(logger_name, level=logging.ERROR):
logger = logging.getLogger("azure_logger_" + logger_name)
logger.setLevel(level)
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


class ProcessMonitor:
def __init__(self, log_filename, logger_name, log_interval=5.0, print_console=False,
def __init__(self, log_filename, logger_name, log_interval=30.0, print_console=False,
process_id=None, **kwargs):
"""
Process Monitor monitors the CPU usage, memory usage of a specific process.
Expand Down
18 changes: 14 additions & 4 deletions sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import os
import asyncio
import configparser
from argparse import ArgumentParser
from datetime import timedelta
from dotenv import load_dotenv

from azure.servicebus import ServiceBusClient
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
Expand All @@ -16,9 +18,7 @@
from app_insights_metric import AzureMonitorMetric
from process_monitor import ProcessMonitor

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

ENV_FILE = os.environ.get("ENV_FILE")

def sync_send(client, args):
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender")
Expand Down Expand Up @@ -53,6 +53,9 @@ async def async_send(client, args):


def sync_receive(client, args):
config = configparser.ConfigParser()
config.read("./stress_runner.cfg")

azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver")
process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync")
stress_test = StressTestRunner(
Expand Down Expand Up @@ -87,10 +90,14 @@ async def async_receive(client, args):


if __name__ == '__main__':
load_dotenv(dotenv_path=ENV_FILE, override=True)
parser = ArgumentParser()
parser.add_argument("--conn_str", help="ServiceBus connection string",
default=os.environ.get('SERVICE_BUS_CONNECTION_STR'))
parser.add_argument("--queue_name", help="The queue name.", default=os.environ.get("SERVICE_BUS_QUEUE_NAME"))
parser.add_argument("--method", type=str)
parser.add_argument("--duration", type=int, default=259200)
parser.add_argument("--logging-enable", action="store_true")
parser.add_argument("--logging_enable", action="store_true")

parser.add_argument("--send-batch-size", type=int, default=100)
parser.add_argument("--message-size", type=int, default=100)
Expand All @@ -102,6 +109,9 @@ async def async_receive(client, args):
args, _ = parser.parse_known_args()
loop = asyncio.get_event_loop()

CONNECTION_STR = args.conn_str
QUEUE_NAME= args.queue_name

if args.method.startswith("sync"):
sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
else:
Expand Down
Loading