Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: simplify StoredNotifAutopushUser's on_start #614

Merged
merged 6 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tests/load/locustfiles/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy
from locust import LoadTestShape, User
from locustfile import AutopushUser
from stored import StoredNotifAutopushUser

TickTuple = tuple[int, float, list[Type[User]]]

Expand Down Expand Up @@ -85,3 +86,11 @@ def tick(self) -> TickTuple | None:
spawn_rate: float = max(abs(users - self.get_current_user_count()), 1)

return users, spawn_rate, self.user_classes


class StoredNotifAutopushLoadTestShape(AutopushLoadTestShape):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding StoredNotifAutopushUser to AutopushLoadTestShape seemed to force both classes usage during the load test. So I setup this subclass for now so we can focus on solely StoredNotifAutopushUser, we can revisit this later.

"""A load test shape class for StoredNotifAutopushLoadTestShape
(Duration: 10 minutes, Users: 15000).
pjenvey marked this conversation as resolved.
Show resolved Hide resolved
"""

user_classes: list[Type[User]] = [StoredNotifAutopushUser]
35 changes: 22 additions & 13 deletions tests/load/locustfiles/stored.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
from logging import Logger
from typing import Any, TypeAlias

import gevent
import websocket
from args import parse_wait_time
from exceptions import ZeroStatusRequestError
from gevent import Greenlet
from locust import FastHttpUser, events, task
from locust.exception import LocustError
from models import (
Expand Down Expand Up @@ -63,14 +65,16 @@ def __init__(self, environment) -> None:
self.unregister_records: list[RegisterRecord] = []
self.uaid: str = ""
self.ws: WebSocket | None = websocket.WebSocket()
self.ws_greenlet: Greenlet | None = None
self.initialized: bool = False

def wait_time(self):
"""Return the autopush wait time."""
return self.environment.autopush_wait_time(self)

def on_start(self) -> Any:
"""Call when a User starts running."""
self.connect_and_register()
self.ws_greenlet = gevent.spawn(self.connect)

def on_stop(self) -> Any:
"""Call when a User stops running."""
Expand All @@ -82,6 +86,8 @@ def on_stop(self) -> Any:
for channel_id in self.channels.keys():
self.send_unregister(self.ws, channel_id)
self.close()
if self.ws_greenlet:
gevent.kill(self.ws_greenlet)

def on_ws_open(self, ws: WebSocket) -> None:
"""Call when opening a WebSocket.
Expand Down Expand Up @@ -146,19 +152,22 @@ def on_ws_close(
if close_status_code or close_msg:
logger.info(f"WebSocket closed. status={close_status_code} msg={close_msg}")

@task(weight=78)
@task(weight=60)
def send_notification(self) -> None:
"""Send a notification to a registered endpoint while connected to Autopush."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'send_notification' skipped.")
return

endpoint_url: str = random.choice(list(self.channels.values()))
self.post_notification(endpoint_url)

@task(weight=1)
@task(weight=5)
def connect_and_subscribe(self) -> None:
"""Connect, Subscribe a user to an Autopush channel, then disconnect."""
if not self.initialized:
logger.debug("Task 'connect_and_subscribe' skipped.")
return
if not self.ws:
self.connect_and_hello()
self.subscribe()
Expand All @@ -170,10 +179,10 @@ def subscribe(self):
self.send_register(self.ws, channel_id)
self.recv_message()

@task(weight=1)
@task(weight=5)
def connect_and_unsubscribe(self):
"""Connect, Unsubscribe a user to an Autopush channel, then disconnect."""
if not self.channels:
if not (self.initialized and self.channels):
logger.debug("Task 'unsubscribe' skipped.")
return

Expand All @@ -184,23 +193,23 @@ def connect_and_unsubscribe(self):
self.recv_message()
self.close()

@task(weight=20)
@task(weight=30)
def connect_and_read(self) -> None:
"""connect_and_hello then disconnect"""
if not self.initialized:
logger.debug("Task 'connect_and_read' skipped.")
return
self.connect_and_hello()
self.close()

def connect_and_register(self) -> None:
"""Initialize the WebSocket and Hello/Register initial channels"""
def connect(self) -> None:
pjenvey marked this conversation as resolved.
Show resolved Hide resolved
"""Initialize the WebSocket, sending a Hello command"""
if not self.host:
raise LocustError("'host' value is unavailable.")

channel_count = random.randint(1, 3)

self.connect_and_hello()
for i in range(channel_count):
self.subscribe()
self.close()
self.initialized = True

def connect_and_hello(self) -> None:
"""Connect the WebSocket and complete the initial Hello handshake"""
Expand Down
4 changes: 2 additions & 2 deletions tests/load/setup_k8s.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ WORKER_COUNT=150
MACHINE_TYPE='c3d-standard-4' # 4 CPUs + 16GB Memory
BOLD=$(tput bold)
NORM=$(tput sgr0)
DIRECTORY=$(pwd)
LOAD_DIRECTORY=$(dirname $(realpath $0))

AUTOPUSH_DIRECTORY=$DIRECTORY/tests/load/kubernetes-config
AUTOPUSH_DIRECTORY=$LOAD_DIRECTORY/kubernetes-config
MASTER_FILE=locust-master-controller.yml
WORKER_FILE=locust-worker-controller.yml
SERVICE_FILE=locust-master-service.yml
Expand Down