diff --git a/tests/load/locustfiles/load.py b/tests/load/locustfiles/load.py index a988c68cf..584c67848 100644 --- a/tests/load/locustfiles/load.py +++ b/tests/load/locustfiles/load.py @@ -9,6 +9,7 @@ import numpy from locust import LoadTestShape, User from locustfile import AutopushUser +from stored import StoredNotifAutopushUser TickTuple = tuple[int, float, list[Type[User]]] @@ -43,7 +44,7 @@ def calculate_users(self, run_time: int) -> int: class AutopushLoadTestShape(LoadTestShape): - """A load test shape class for Autopush (Duration: 10 minutes, Users: 15000). + """A load test shape class for Autopush (Duration: 10 minutes, Users: 150000). Note: The Shape class assumes that the workers can support the generated spawn rates. Should the number of available Locust workers change or should the Locust worker capacity change, @@ -85,3 +86,12 @@ def tick(self) -> TickTuple | None: spawn_rate: float = max(abs(users - self.get_current_user_count()), 1) return users, spawn_rate, self.user_classes + + +class StoredNotifAutopushLoadTestShape(AutopushLoadTestShape): + """A load test shape class for StoredNotifAutopushLoadTestShape + (Duration: 15 minutes, Users: 150000). + """ + + MAX_RUN_TIME: int = 900 # 15 minutes + user_classes: list[Type[User]] = [StoredNotifAutopushUser] diff --git a/tests/load/locustfiles/stored.py b/tests/load/locustfiles/stored.py index ccce6fab9..7febf8af3 100644 --- a/tests/load/locustfiles/stored.py +++ b/tests/load/locustfiles/stored.py @@ -16,9 +16,11 @@ from logging import Logger from typing import Any, TypeAlias +import gevent import websocket from args import parse_wait_time from exceptions import ZeroStatusRequestError +from gevent import Greenlet from locust import FastHttpUser, events, task from locust.exception import LocustError from models import ( @@ -63,6 +65,8 @@ def __init__(self, environment) -> None: self.unregister_records: list[RegisterRecord] = [] self.uaid: str = "" self.ws: WebSocket | None = websocket.WebSocket() + self.ws_greenlet: Greenlet | None = None + self.initialized: bool = False def wait_time(self): """Return the autopush wait time.""" @@ -70,7 +74,7 @@ def wait_time(self): def on_start(self) -> Any: """Call when a User starts running.""" - self.connect_and_register() + self.ws_greenlet = gevent.spawn(self.connect) def on_stop(self) -> Any: """Call when a User stops running.""" @@ -82,6 +86,8 @@ def on_stop(self) -> Any: for channel_id in self.channels.keys(): self.send_unregister(self.ws, channel_id) self.close() + if self.ws_greenlet: + gevent.kill(self.ws_greenlet) def on_ws_open(self, ws: WebSocket) -> None: """Call when opening a WebSocket. @@ -146,19 +152,22 @@ def on_ws_close( if close_status_code or close_msg: logger.info(f"WebSocket closed. status={close_status_code} msg={close_msg}") - @task(weight=78) + @task(weight=60) def send_notification(self) -> None: """Send a notification to a registered endpoint while connected to Autopush.""" - if not self.channels: + if not (self.initialized and self.channels): logger.debug("Task 'send_notification' skipped.") return endpoint_url: str = random.choice(list(self.channels.values())) self.post_notification(endpoint_url) - @task(weight=1) + @task(weight=5) def connect_and_subscribe(self) -> None: - """Connect, Subscribe a user to an Autopush channel, then disconnect.""" + """Connect, subscribe a user to an Autopush channel, then disconnect.""" + if not self.initialized: + logger.debug("Task 'connect_and_subscribe' skipped.") + return if not self.ws: self.connect_and_hello() self.subscribe() @@ -170,10 +179,10 @@ def subscribe(self): self.send_register(self.ws, channel_id) self.recv_message() - @task(weight=1) + @task(weight=5) def connect_and_unsubscribe(self): """Connect, Unsubscribe a user to an Autopush channel, then disconnect.""" - if not self.channels: + if not (self.initialized and self.channels): logger.debug("Task 'unsubscribe' skipped.") return @@ -184,26 +193,30 @@ def connect_and_unsubscribe(self): self.recv_message() self.close() - @task(weight=20) + @task(weight=30) def connect_and_read(self) -> None: """connect_and_hello then disconnect""" + if not self.initialized: + logger.debug("Task 'connect_and_read' skipped.") + return self.connect_and_hello() self.close() - def connect_and_register(self) -> None: - """Initialize the WebSocket and Hello/Register initial channels""" + def connect(self) -> None: + """Connect the WebSocket, send the initial 'hello' message, then disconnect. + + This receives a new UAID from autoconnect which is used throughout the + rest of the test. + """ if not self.host: raise LocustError("'host' value is unavailable.") - channel_count = random.randint(1, 3) - self.connect_and_hello() - for i in range(channel_count): - self.subscribe() self.close() + self.initialized = True def connect_and_hello(self) -> None: - """Connect the WebSocket and complete the initial Hello handshake""" + """Connect, 'hello', then read any pending notifications""" self.ws = websocket.WebSocket() self.ws.connect(self.host) self.send_hello(self.ws) diff --git a/tests/load/setup_k8s.sh b/tests/load/setup_k8s.sh old mode 100644 new mode 100755 index cf1f1b217..34f411626 --- a/tests/load/setup_k8s.sh +++ b/tests/load/setup_k8s.sh @@ -14,9 +14,9 @@ WORKER_COUNT=150 MACHINE_TYPE='c3d-standard-4' # 4 CPUs + 16GB Memory BOLD=$(tput bold) NORM=$(tput sgr0) -DIRECTORY=$(pwd) +LOAD_DIRECTORY=$(dirname $(realpath $0)) -AUTOPUSH_DIRECTORY=$DIRECTORY/tests/load/kubernetes-config +AUTOPUSH_DIRECTORY=$LOAD_DIRECTORY/kubernetes-config MASTER_FILE=locust-master-controller.yml WORKER_FILE=locust-worker-controller.yml SERVICE_FILE=locust-master-service.yml