Skip to content

Commit

Permalink
fix: reset the WebSocket every time it disconnects (#607)
Browse files Browse the repository at this point in the history
mimicking WebSocketApp: as it appears WebSocket's internal buffers
aren't cleanly reset upon close

Issue: SYNC-3916
  • Loading branch information
pjenvey authored Feb 8, 2024
1 parent 04df79e commit 9bb964c
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions tests/load/locustfiles/stored.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, environment) -> None:
self.register_records: list[RegisterRecord] = []
self.unregister_records: list[RegisterRecord] = []
self.uaid: str = ""
self.ws: WebSocket = websocket.WebSocket()
self.ws: WebSocket | None = websocket.WebSocket()

def wait_time(self):
"""Return the autopush wait time."""
Expand All @@ -76,11 +76,12 @@ def on_stop(self) -> Any:
"""Call when a User stops running."""
if not self.channels:
return
if not self.ws.connected:
if not self.ws:
self.connect_and_hello()
assert self.ws
for channel_id in self.channels.keys():
self.send_unregister(self.ws, channel_id)
self.ws.close()
self.close()

def on_ws_open(self, ws: WebSocket) -> None:
"""Call when opening a WebSocket.
Expand Down Expand Up @@ -156,34 +157,38 @@ def send_notification(self):
self.post_notification(endpoint_url)

@task(weight=1)
def connect_and_subscribe(self):
"""Connect, Subscribe a user to an Autopush channel, then disconnect."""
if not self.ws:
self.connect_and_hello()
self.subscribe()
self.close()

def subscribe(self):
"""Subscribe a user to an Autopush channel."""
if not self.ws.connected:
self.connect_and_hello()
channel_id: str = str(uuid.uuid4())
self.send_register(self.ws, channel_id)
self.recv_message()
self.ws.close()

@task(weight=1)
def unsubscribe(self):
"""Unsubscribe a user from an Autopush channel."""
def connect_and_unsubscribe(self):
"""Connect, Unsubscribe a user to an Autopush channel, then disconnect."""
if not self.channels:
logger.debug("Task 'unsubscribe' skipped.")
return

if not self.ws.connected:
if not self.ws:
self.connect_and_hello()
channel_id: str = random.choice(list(self.channels.keys()))
self.send_unregister(self.ws, channel_id)
self.recv_message()
self.ws.close()
self.close()

@task(weight=20)
def connect_and_read(self) -> None:
"""connect_and_hello then disconnect"""
self.connect_and_hello()
self.ws.close()
self.close()

def connect_and_register(self) -> None:
"""Initialize the WebSocket and Hello/Register initial channels"""
Expand All @@ -192,14 +197,14 @@ def connect_and_register(self) -> None:

channel_count = random.randint(1, 3)

self.ws = websocket.WebSocket()
self.connect_and_hello()
for i in range(channel_count):
self.subscribe()
self.ws.close()
self.close()

def connect_and_hello(self) -> None:
"""Connect the WebSocket and complete the initial Hello handshake"""
self.ws = websocket.WebSocket()
self.ws.connect(self.host)
self.send_hello(self.ws)
self.recv_message()
Expand All @@ -209,12 +214,19 @@ def connect_and_hello(self) -> None:

def recv_message(self) -> None:
"""Receive and handle data from the WebSocket"""
assert self.ws
data = self.ws.recv()
if not isinstance(data, str):
logger.error("recv_message unexpectedly recieved bytes")
data = str(data)
self.on_ws_message(self.ws, data)

def close(self) -> None:
"""Close the WebSocket connection"""
if self.ws:
self.ws.close()
self.ws = None

def post_notification(self, endpoint_url: str) -> None:
"""Send a notification to Autopush.
Expand Down

0 comments on commit 9bb964c

Please sign in to comment.