Skip to content

Commit

Permalink
feat: simplify StoredNotifAutopushUser's on_start (#614)
Browse files Browse the repository at this point in the history
to match AutopushUser's quicker initializion time when locust is
spawning and setup a quick matching shape class.

Issue: SYNC-3916
  • Loading branch information
b4handjr authored Feb 15, 2024
2 parents 9954b33 + 5bb3c98 commit c284111
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
12 changes: 11 additions & 1 deletion tests/load/locustfiles/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy
from locust import LoadTestShape, User
from locustfile import AutopushUser
from stored import StoredNotifAutopushUser

TickTuple = tuple[int, float, list[Type[User]]]

Expand Down Expand Up @@ -43,7 +44,7 @@ def calculate_users(self, run_time: int) -> int:


class AutopushLoadTestShape(LoadTestShape):
"""A load test shape class for Autopush (Duration: 10 minutes, Users: 15000).
"""A load test shape class for Autopush (Duration: 10 minutes, Users: 150000).
Note: The Shape class assumes that the workers can support the generated spawn rates. Should
the number of available Locust workers change or should the Locust worker capacity change,
Expand Down Expand Up @@ -85,3 +86,12 @@ def tick(self) -> TickTuple | None:
spawn_rate: float = max(abs(users - self.get_current_user_count()), 1)

return users, spawn_rate, self.user_classes


class StoredNotifAutopushLoadTestShape(AutopushLoadTestShape):
"""A load test shape class for StoredNotifAutopushLoadTestShape
(Duration: 15 minutes, Users: 150000).
"""

MAX_RUN_TIME: int = 900 # 15 minutes
user_classes: list[Type[User]] = [StoredNotifAutopushUser]
43 changes: 28 additions & 15 deletions tests/load/locustfiles/stored.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
from logging import Logger
from typing import Any, TypeAlias

import gevent
import websocket
from args import parse_wait_time
from exceptions import ZeroStatusRequestError
from gevent import Greenlet
from locust import FastHttpUser, events, task
from locust.exception import LocustError
from models import (
Expand Down Expand Up @@ -63,14 +65,16 @@ def __init__(self, environment) -> None:
self.unregister_records: list[RegisterRecord] = []
self.uaid: str = ""
self.ws: WebSocket | None = websocket.WebSocket()
self.ws_greenlet: Greenlet | None = None
self.initialized: bool = False

def wait_time(self):
"""Return the autopush wait time."""
return self.environment.autopush_wait_time(self)

def on_start(self) -> Any:
"""Call when a User starts running."""
self.connect_and_register()
self.ws_greenlet = gevent.spawn(self.connect)

def on_stop(self) -> Any:
"""Call when a User stops running."""
Expand All @@ -82,6 +86,8 @@ def on_stop(self) -> Any:
for channel_id in self.channels.keys():
self.send_unregister(self.ws, channel_id)
self.close()
if self.ws_greenlet:
gevent.kill(self.ws_greenlet)

def on_ws_open(self, ws: WebSocket) -> None:
"""Call when opening a WebSocket.
Expand Down Expand Up @@ -146,19 +152,22 @@ def on_ws_close(
if close_status_code or close_msg:
logger.info(f"WebSocket closed. status={close_status_code} msg={close_msg}")

@task(weight=78)
@task(weight=60)
def send_notification(self) -> None:
"""Send a notification to a registered endpoint while connected to Autopush."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'send_notification' skipped.")
return

endpoint_url: str = random.choice(list(self.channels.values()))
self.post_notification(endpoint_url)

@task(weight=1)
@task(weight=5)
def connect_and_subscribe(self) -> None:
"""Connect, Subscribe a user to an Autopush channel, then disconnect."""
"""Connect, subscribe a user to an Autopush channel, then disconnect."""
if not self.initialized:
logger.debug("Task 'connect_and_subscribe' skipped.")
return
if not self.ws:
self.connect_and_hello()
self.subscribe()
Expand All @@ -170,10 +179,10 @@ def subscribe(self):
self.send_register(self.ws, channel_id)
self.recv_message()

@task(weight=1)
@task(weight=5)
def connect_and_unsubscribe(self):
"""Connect, Unsubscribe a user to an Autopush channel, then disconnect."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'unsubscribe' skipped.")
return

Expand All @@ -184,26 +193,30 @@ def connect_and_unsubscribe(self):
self.recv_message()
self.close()

@task(weight=20)
@task(weight=30)
def connect_and_read(self) -> None:
"""connect_and_hello then disconnect"""
if not self.initialized:
logger.debug("Task 'connect_and_read' skipped.")
return
self.connect_and_hello()
self.close()

def connect_and_register(self) -> None:
"""Initialize the WebSocket and Hello/Register initial channels"""
def connect(self) -> None:
"""Connect the WebSocket, send the initial 'hello' message, then disconnect.
This receives a new UAID from autoconnect which is used throughout the
rest of the test.
"""
if not self.host:
raise LocustError("'host' value is unavailable.")

channel_count = random.randint(1, 3)

self.connect_and_hello()
for i in range(channel_count):
self.subscribe()
self.close()
self.initialized = True

def connect_and_hello(self) -> None:
"""Connect the WebSocket and complete the initial Hello handshake"""
"""Connect, 'hello', then read any pending notifications"""
self.ws = websocket.WebSocket()
self.ws.connect(self.host)
self.send_hello(self.ws)
Expand Down
4 changes: 2 additions & 2 deletions tests/load/setup_k8s.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ WORKER_COUNT=150
MACHINE_TYPE='c3d-standard-4' # 4 CPUs + 16GB Memory
BOLD=$(tput bold)
NORM=$(tput sgr0)
DIRECTORY=$(pwd)
LOAD_DIRECTORY=$(dirname $(realpath $0))

AUTOPUSH_DIRECTORY=$DIRECTORY/tests/load/kubernetes-config
AUTOPUSH_DIRECTORY=$LOAD_DIRECTORY/kubernetes-config
MASTER_FILE=locust-master-controller.yml
WORKER_FILE=locust-worker-controller.yml
SERVICE_FILE=locust-master-service.yml
Expand Down

0 comments on commit c284111

Please sign in to comment.