Skip to content

Commit

Permalink
Make receive method more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
gregorjerse committed Jun 13, 2024
1 parent 150259f commit 372ad13
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
37 changes: 29 additions & 8 deletions resolwe/flow/executors/socket_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ class PeerStatus(Enum):
UNRESPONSIVE = "unresponsive"


class ReceiveStatus(Enum):
"""Status of the received method."""

OK = "OK"
INVALID_MESSAGE = "invalid"
CANCELLED = "cancelled"
SOCKET_CLOSED = "socket_closed"
TERMINATED = "terminated"


PeerIdentity = bytes
MessageDataType = TypeVar("MessageDataType")
ResponseDataType = TypeVar("ResponseDataType")
Expand Down Expand Up @@ -563,7 +573,9 @@ def call_command(*args):
else:
return call_command

async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]:
async def _receive_message(
self,
) -> Tuple[ReceiveStatus, Optional[Tuple[PeerIdentity, Message]]]:
"""Receive a single message.
This method is blocking: it waits for the message to arrive. The
Expand All @@ -581,6 +593,7 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]:
the key "type".
"""
result = None
received_status = ReceiveStatus.INVALID_MESSAGE
try:
self.logger.debug("Communicator %s waiting for message.", self.name)
receive_task = asyncio.ensure_future(self.receive_method(self.reader))
Expand All @@ -592,6 +605,7 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]:
try:
received = receive_task.result()
except asyncio.IncompleteReadError:
received_status = ReceiveStatus.SOCKET_CLOSED
self.logger.info("Socket closed by peer, stopping communication.")
received = None
if received is not None:
Expand All @@ -601,25 +615,28 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]:
received[1]["client_id"] = received[2]
assert Message.is_valid(received[1])
result = received[0], Message.from_dict(received[1])
received_status = ReceiveStatus.OK
else:
received_status = ReceiveStatus.TERMINATED
self.logger.debug(
"Communicator %s _receive_message: terminating flag is set, returning None",
self.name,
)
# Do not log cancelled errors.
except asyncio.CancelledError:
received_status = ReceiveStatus.CANCELLED
self.logger.debug(
"Communicator %s: CancelledError in _receive_message.", self.name
)
except:
received_status = ReceiveStatus.INVALID_MESSAGE
self.logger.exception(
"Communicator %s: exception in _receive_message.", self.name
)
finally:
# Always stop both tasks.
receive_task.cancel()
terminating_task.cancel()
return result
return received_status, result

async def _send_message(
self,
Expand Down Expand Up @@ -714,13 +731,17 @@ async def start_listening(self):
"""
try:
while True:
received = await self._receive_message()
status, received = await self._receive_message()

if received is None:
self.logger.info(
f"Communicator {self.name}: received empty message, closing communicator."
)
break
if status in [
ReceiveStatus.SOCKET_CLOSED,
ReceiveStatus.TERMINATED,
ReceiveStatus.CANCELLED,
]:
break
elif ReceiveStatus.INVALID_MESSAGE:
continue

identity, message = received

Expand Down
12 changes: 0 additions & 12 deletions resolwe/flow/executors/zeromq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import json
import os
from contextlib import suppress
from logging import Logger
from threading import Lock
from typing import Any, Optional, Tuple
Expand Down Expand Up @@ -102,14 +101,3 @@ def instance(cls, context=None):
cls._instance = cls(context=context)
cls._instance_pid = os.getpid()
return cls._instance

def start(self):
"""Ignore possible exception when testing."""
# The is_testing is not available in the executor so it is imported here.
from resolwe.test.utils import is_testing

if is_testing():
with suppress(zmq.error.ZMQError):
super().start()
else:
super().start()
3 changes: 0 additions & 3 deletions resolwe/flow/tests/test_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ def _check_filter(
request = factory.get("/", query_args, format="json")
force_authenticate(request, user or self.admin)
response = self.viewset(request)
print("Got response")
print(response)
print(response.data)

if status.is_success(response.status_code):
self.assertEqual(len(response.data), len(expected))
Expand Down
2 changes: 2 additions & 0 deletions resolwe/test_helpers/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ async def _run_on_infrastructure(meth, *args, **kwargs):
logger.exception("Exception while running test")
finally:
zmq_info.authenticator.stop()
# Make sure authenticator task is stopped.
await asyncio.sleep(0.1)
logger.debug("test_runner: Terminating listener")


Expand Down

0 comments on commit 372ad13

Please sign in to comment.