Skip to content

Commit

Permalink
[SB Pyamqp] stress updates (#29783)
Browse files Browse the repository at this point in the history
* stress updates

* changes

* add memray to stress

* undo docker file changes

* add memray chaos

* timeoutError raise

* devred

* try log to file

* test indiv

* test indv

* updates

* tests

* logging_enable

* stress

* update

* delete

* remove changes to code

* change level

* update chart.yaml

* update to local running of indv components

* updates

* remove

* update

* update test base

* remove eh changes

* logging

* update jpb

* update docker

* update scenarios

* logging

---------

Co-authored-by: swathipil <[email protected]>
  • Loading branch information
l0lawrence and swathipil authored May 2, 2023
1 parent 328a3df commit 2c3fa6c
Show file tree
Hide file tree
Showing 15 changed files with 951 additions and 294 deletions.
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/stress/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
stress
stress.exe
.env
Dockerfile
*.py
*.txt
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/stress/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.2.0
digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0
generated: "2022-11-19T01:30:02.403917379Z"
digest: sha256:53cbe4c0fed047f6c611523bd34181b21a310e7a3a21cb14f649bb09e4a77648
generated: "2023-03-14T09:57:20.6731895-07:00"
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/stress/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: v2
name: python-servicebus-stress-test
name: py-sb-stress-test
description: python service bus stress test.
version: 0.1.2
appVersion: v0.2
Expand All @@ -9,5 +9,5 @@ annotations:

dependencies:
- name: stress-test-addons
version: ~0.2.0
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.2.0
repository: "@stress-test-charts"
4 changes: 3 additions & 1 deletion sdk/servicebus/azure-servicebus/stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# public OSS users should simply leave this argument blank or ignore its presence entirely
ARG REGISTRY="mcr.microsoft.com/mirror/docker/library/"
FROM ${REGISTRY}python:3.8-slim-buster
# Install if running off git branch
# RUN apt-get -y update && apt-get -y install git

WORKDIR /app

COPY ./scripts /app/stress/scripts

WORKDIR /app/stress/scripts
RUN pip3 install -r dev_requirements.txt
RUN pip install -r dev_requirements.txt
31 changes: 28 additions & 3 deletions sdk/servicebus/azure-servicebus/stress/scenarios-matrix.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
displayNames:
matrix:
image:
- Dockerfile
scenarios:
sbStress:
testTarget: servicebus
queue:
testTarget: queue
aqueue:
testTarget: aqueue
queuepull:
testTarget: queuepull
aqueuepull:
testTarget: aqueuepull
batch:
testTarget: batch
abatch:
testTarget: abatch
queuew:
testTarget: queuew
aqueuew:
testTarget: aqueuew
queuepullw:
testTarget: queuepullw
aqueuepullw:
testTarget: aqueuepullw
batchw:
testTarget: batchw
abatchw:
testTarget: abatchw
memray:
testTarget: memray
amemray:
testTarget: amemray
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
aiohttp>=3.0; python_version >= '3.5'
aiohttp>=3.0
opencensus-ext-azure
psutil
pytest
azure-servicebus
python-dotenv
websocket-client
70 changes: 15 additions & 55 deletions sdk/servicebus/azure-servicebus/stress/scripts/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,77 +11,37 @@
from opencensus.ext.azure.log_exporter import AzureLogHandler


def get_base_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
def get_base_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
logger = logging.getLogger(logger_name)
logger.setLevel(level)
formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')

if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
if not logger.handlers:
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

if log_filename:
logger_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
logger_file_handler.setFormatter(formatter)
logger.addHandler(logger_file_handler)

console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger


def get_logger(log_filename, logger_name, level=logging.INFO, print_console=False, log_format=None,
def get_logger(log_filename, logger_name, level=logging.ERROR, print_console=False, log_format=None,
log_file_max_bytes=20 * 1024 * 1024, log_file_backup_count=3):
stress_logger = logging.getLogger(logger_name)
stress_logger.setLevel(level)
eventhub_logger = logging.getLogger("azure.eventhub")
eventhub_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(level)
servicebus_logger = logging.getLogger("azure.servicebus")
servicebus_logger.setLevel(level)
pyamqp_logger = logging.getLogger("azure.servicebus._pyamqp")
pyamqp_logger.setLevel(level)

formatter = log_format or logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
if print_console:
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
if not eventhub_logger.handlers:
eventhub_logger.addHandler(console_handler)
if not uamqp_logger.handlers:
uamqp_logger.addHandler(console_handler)
if not stress_logger.handlers:
stress_logger.addHandler(console_handler)

if log_filename:
eventhub_file_handler = RotatingFileHandler(
"eventhub_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
uamqp_file_handler = RotatingFileHandler(
"uamqp_" + log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
stress_file_handler = RotatingFileHandler(
log_filename,
maxBytes=log_file_max_bytes,
backupCount=log_file_backup_count
)
eventhub_file_handler.setFormatter(formatter)
uamqp_file_handler.setFormatter(formatter)
stress_file_handler.setFormatter(formatter)
eventhub_logger.addHandler(eventhub_file_handler)
uamqp_logger.addHandler(uamqp_file_handler)
stress_logger.addHandler(stress_file_handler)
console_handler = logging.FileHandler(log_filename)
console_handler.setFormatter(formatter)
servicebus_logger.addHandler(console_handler)
pyamqp_logger.addHandler(console_handler)
stress_logger.addHandler(console_handler)

return stress_logger


def get_azure_logger(logger_name, level=logging.INFO):
def get_azure_logger(logger_name, level=logging.ERROR):
logger = logging.getLogger("azure_logger_" + logger_name)
logger.setLevel(level)
# oc will automatically search for the ENV VAR 'APPLICATIONINSIGHTS_CONNECTION_STRING'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


class ProcessMonitor:
def __init__(self, log_filename, logger_name, log_interval=5.0, print_console=False,
def __init__(self, log_filename, logger_name, log_interval=30.0, print_console=False,
process_id=None, **kwargs):
"""
Process Monitor monitors the CPU usage, memory usage of a specific process.
Expand Down
18 changes: 14 additions & 4 deletions sdk/servicebus/azure-servicebus/stress/scripts/stress_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import os
import asyncio
import configparser
from argparse import ArgumentParser
from datetime import timedelta
from dotenv import load_dotenv

from azure.servicebus import ServiceBusClient
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient
Expand All @@ -16,9 +18,7 @@
from app_insights_metric import AzureMonitorMetric
from process_monitor import ProcessMonitor

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

ENV_FILE = os.environ.get("ENV_FILE")

def sync_send(client, args):
azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Sender")
Expand Down Expand Up @@ -53,6 +53,9 @@ async def async_send(client, args):


def sync_receive(client, args):
config = configparser.ConfigParser()
config.read("./stress_runner.cfg")

azure_monitor_metric = AzureMonitorMetric("Sync ServiceBus Receiver")
process_monitor = ProcessMonitor("monitor_receiver_stress_sync.log", "receiver_stress_sync")
stress_test = StressTestRunner(
Expand Down Expand Up @@ -87,10 +90,14 @@ async def async_receive(client, args):


if __name__ == '__main__':
load_dotenv(dotenv_path=ENV_FILE, override=True)
parser = ArgumentParser()
parser.add_argument("--conn_str", help="ServiceBus connection string",
default=os.environ.get('SERVICE_BUS_CONNECTION_STR'))
parser.add_argument("--queue_name", help="The queue name.", default=os.environ.get("SERVICE_BUS_QUEUE_NAME"))
parser.add_argument("--method", type=str)
parser.add_argument("--duration", type=int, default=259200)
parser.add_argument("--logging-enable", action="store_true")
parser.add_argument("--logging_enable", action="store_true")

parser.add_argument("--send-batch-size", type=int, default=100)
parser.add_argument("--message-size", type=int, default=100)
Expand All @@ -102,6 +109,9 @@ async def async_receive(client, args):
args, _ = parser.parse_known_args()
loop = asyncio.get_event_loop()

CONNECTION_STR = args.conn_str
QUEUE_NAME= args.queue_name

if args.method.startswith("sync"):
sb_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
else:
Expand Down
Loading

0 comments on commit 2c3fa6c

Please sign in to comment.