Skip to content

Commit

Permalink
test: add subscribe and unsubscribe tasks to AutopushUser
Browse files Browse the repository at this point in the history
  • Loading branch information
Trinaa committed Sep 28, 2023
1 parent 5f8efb2 commit 07d988d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
64 changes: 54 additions & 10 deletions tests/load/locustfiles/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
NotificationRecord,
RegisterMessage,
RegisterRecord,
UnregisterMessage,
)
from pydantic import ValidationError
from websocket import WebSocketApp, WebSocketConnectionClosedException

Message: TypeAlias = HelloMessage | NotificationMessage | RegisterMessage
Message: TypeAlias = HelloMessage | NotificationMessage | RegisterMessage | UnregisterMessage
Record: TypeAlias = HelloRecord | NotificationRecord | RegisterRecord

# Set to 'True' to view the verbose connection information for the web socket
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(self, environment) -> None:
self.hello_record: HelloRecord | None = None
self.notification_records: list[NotificationRecord] = []
self.register_records: list[RegisterRecord] = []
self.unregister_records: list[RegisterRecord] = []
self.uaid: str = ""
self.ws: WebSocketApp | None = None
self.ws_greenlet: Greenlet | None = None
Expand All @@ -99,6 +101,8 @@ def on_start(self) -> Any:
def on_stop(self) -> Any:
"""Called when a User stops running."""
if self.ws:
for channel_id in self.channels.keys():
self.send_unregister(self.ws, channel_id)
self.ws.close()
self.ws = None
if self.ws_greenlet:
Expand All @@ -122,14 +126,12 @@ def on_ws_message(self, ws: WebSocketApp, data: str) -> None:
message: Message | None = self.recv(data)
if isinstance(message, HelloMessage):
self.uaid = message.uaid
# In order to assure notifications are sent on startup, users are
# required to be subscribed to a channel
if not self.channels:
self.send_register(ws)
elif isinstance(message, NotificationMessage):
self.send_ack(ws, message.channelID, message.version)
elif isinstance(message, RegisterMessage):
self.channels[message.channelID] = message.pushEndpoint
elif isinstance(message, UnregisterMessage):
del self.channels[message.channelID]

def on_ws_error(self, ws: WebSocketApp, error: Exception) -> None:
"""Called when there is a WebSocketApp error or if an exception is raised in a WebSocket
Expand Down Expand Up @@ -172,6 +174,26 @@ def send_notification(self):
endpoint_url: str = random.choice(list(self.channels.values()))
self.post_notification(endpoint_url)

@task(weight=1)
def subscribe(self):
"""Subscribes a user to an Autopush channel."""
if not self.ws:
logger.debug("Task 'subscribe' skipped.")
return

channel_id: str = str(uuid.uuid4())
self.send_register(self.ws, channel_id)

@task(weight=1)
def unsubscribe(self):
"""Unsubscribes a user from an Autopush channel."""
if not self.ws or not self.channels:
logger.debug("Task 'unsubscribe' skipped.")
return

channel_id: str = random.choice(list(self.channels.keys()))
self.send_unregister(self.ws, channel_id)

def connect(self) -> None:
"""Creates the WebSocketApp that will run indefinitely."""
self.ws = websocket.WebSocketApp(
Expand Down Expand Up @@ -250,9 +272,16 @@ def recv(self, data: str) -> Message | None:
)
case "register":
message = RegisterMessage(**message_dict)
message_channel_id: str = message.channelID # type: ignore[union-attr]
register_chid: str = message.channelID # type: ignore[union-attr]
record = next(
(r for r in self.register_records if r.channel_id == register_chid),
None,
)
case "unregister":
message = UnregisterMessage(**message_dict)
unregister_chid: str = message.channelID # type: ignore[union-attr]
record = next(
(r for r in self.register_records if r.channel_id == message_channel_id),
(r for r in self.unregister_records if r.channel_id == unregister_chid),
None,
)
case _:
Expand Down Expand Up @@ -318,27 +347,42 @@ def send_hello(self, ws: WebSocketApp) -> None:
self.hello_record = HelloRecord(send_time=time.perf_counter())
self.send(ws, message_type, data)

def send_register(self, ws: WebSocketApp) -> None:
def send_register(self, ws: WebSocketApp, channel_id: str) -> None:
"""Send a 'register' message to Autopush.
Args:
ws: WebSocket class object
channel_id: Notification message channel ID
Raises:
WebSocketException: Error raised by the WebSocket client
"""
message_type: str = "register"
channel_id: str = str(uuid.uuid4())
data: dict[str, Any] = dict(messageType=message_type, channelID=channel_id)
record = RegisterRecord(send_time=time.perf_counter(), channel_id=channel_id)
self.register_records.append(record)
self.send(ws, message_type, data)

def send_unregister(self, ws: WebSocketApp, channel_id: str) -> None:
"""Send an 'unregister' message to Autopush.
Args:
ws: WebSocket class object
channel_id: Notification message channel ID
Raises:
WebSocketException: Error raised by the WebSocket client
"""
message_type: str = "unregister"
data: dict[str, Any] = dict(messageType=message_type, channelID=channel_id)
record = RegisterRecord(send_time=time.perf_counter(), channel_id=channel_id)
self.unregister_records.append(record)
self.send(ws, message_type, data)

def send(self, ws: WebSocketApp, message_type: str, data: dict[str, Any]) -> None:
"""Send a message to Autopush.
Args:
ws: WebSocket class object
message_type: Message type. Examples: 'ack', 'hello' or 'register'
message_type: Message type. Examples: 'ack', 'hello', 'register' or 'unregister'
data: Message data
Raises:
WebSocketException: Error raised by the WebSocket client
Expand Down
8 changes: 8 additions & 0 deletions tests/load/locustfiles/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ class RegisterRecord(BaseModel):

send_time: float
channel_id: str


class UnregisterMessage(BaseModel):
"""Autopush 'unregister' response message."""

messageType: Literal["unregister"]
channelID: str
status: Literal[200]

0 comments on commit 07d988d

Please sign in to comment.