Skip to content

Commit

Permalink
feat: simplify StoredNotifAutopushUser's on_start
Browse files Browse the repository at this point in the history
to match AutopushUser's quicker initializion time when locust is spawning

and setup a quick matching shape class

Issue: SYNC-3916
  • Loading branch information
pjenvey committed Feb 14, 2024
1 parent 37b377a commit 0d0e99e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
9 changes: 9 additions & 0 deletions tests/load/locustfiles/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy
from locust import LoadTestShape, User
from locustfile import AutopushUser
from stored import StoredNotifAutopushUser

TickTuple = tuple[int, float, list[Type[User]]]

Expand Down Expand Up @@ -85,3 +86,11 @@ def tick(self) -> TickTuple | None:
spawn_rate: float = max(abs(users - self.get_current_user_count()), 1)

return users, spawn_rate, self.user_classes


class StoredNotifAutopushLoadTestShape(AutopushLoadTestShape):
"""A load test shape class for StoredNotifAutopushLoadTestShape
(Duration: 10 minutes, Users: 15000).
"""

user_classes: list[Type[User]] = [StoredNotifAutopushUser]
35 changes: 22 additions & 13 deletions tests/load/locustfiles/stored.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
from logging import Logger
from typing import Any, TypeAlias

import gevent
import websocket
from args import parse_wait_time
from exceptions import ZeroStatusRequestError
from gevent import Greenlet
from locust import FastHttpUser, events, task
from locust.exception import LocustError
from models import (
Expand Down Expand Up @@ -63,14 +65,16 @@ def __init__(self, environment) -> None:
self.unregister_records: list[RegisterRecord] = []
self.uaid: str = ""
self.ws: WebSocket | None = websocket.WebSocket()
self.ws_greenlet: Greenlet | None = None
self.initialized: bool = False

def wait_time(self):
"""Return the autopush wait time."""
return self.environment.autopush_wait_time(self)

def on_start(self) -> Any:
"""Call when a User starts running."""
self.connect_and_register()
self.ws_greenlet = gevent.spawn(self.connect)

def on_stop(self) -> Any:
"""Call when a User stops running."""
Expand All @@ -82,6 +86,8 @@ def on_stop(self) -> Any:
for channel_id in self.channels.keys():
self.send_unregister(self.ws, channel_id)
self.close()
if self.ws_greenlet:
gevent.kill(self.ws_greenlet)

def on_ws_open(self, ws: WebSocket) -> None:
"""Call when opening a WebSocket.
Expand Down Expand Up @@ -146,19 +152,22 @@ def on_ws_close(
if close_status_code or close_msg:
logger.info(f"WebSocket closed. status={close_status_code} msg={close_msg}")

@task(weight=78)
@task(weight=60)
def send_notification(self) -> None:
"""Send a notification to a registered endpoint while connected to Autopush."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'send_notification' skipped.")
return

endpoint_url: str = random.choice(list(self.channels.values()))
self.post_notification(endpoint_url)

@task(weight=1)
@task(weight=5)
def connect_and_subscribe(self) -> None:
"""Connect, Subscribe a user to an Autopush channel, then disconnect."""
if not self.initialized:
logger.debug("Task 'connect_and_subscribe' skipped.")
return
if not self.ws:
self.connect_and_hello()
self.subscribe()
Expand All @@ -170,10 +179,10 @@ def subscribe(self):
self.send_register(self.ws, channel_id)
self.recv_message()

@task(weight=1)
@task(weight=5)
def connect_and_unsubscribe(self):
"""Connect, Unsubscribe a user to an Autopush channel, then disconnect."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'unsubscribe' skipped.")
return

Expand All @@ -184,23 +193,23 @@ def connect_and_unsubscribe(self):
self.recv_message()
self.close()

@task(weight=20)
@task(weight=30)
def connect_and_read(self) -> None:
"""connect_and_hello then disconnect"""
if not self.initialized:
logger.debug("Task 'connect_and_read' skipped.")
return
self.connect_and_hello()
self.close()

def connect_and_register(self) -> None:
"""Initialize the WebSocket and Hello/Register initial channels"""
def connect(self) -> None:
"""Initialize the WebSocket, sending a Hello command"""
if not self.host:
raise LocustError("'host' value is unavailable.")

channel_count = random.randint(1, 3)

self.connect_and_hello()
for i in range(channel_count):
self.subscribe()
self.close()
self.initialized = True

def connect_and_hello(self) -> None:
"""Connect the WebSocket and complete the initial Hello handshake"""
Expand Down
4 changes: 2 additions & 2 deletions tests/load/setup_k8s.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ WORKER_COUNT=150
MACHINE_TYPE='c3d-standard-4' # 4 CPUs + 16GB Memory
BOLD=$(tput bold)
NORM=$(tput sgr0)
DIRECTORY=$(pwd)
LOAD_DIRECTORY=$(dirname $(realpath $0))

AUTOPUSH_DIRECTORY=$DIRECTORY/tests/load/kubernetes-config
AUTOPUSH_DIRECTORY=$LOAD_DIRECTORY/kubernetes-config
MASTER_FILE=locust-master-controller.yml
WORKER_FILE=locust-worker-controller.yml
SERVICE_FILE=locust-master-service.yml
Expand Down

0 comments on commit 0d0e99e

Please sign in to comment.