diff --git a/resolwe/flow/executors/socket_utils.py b/resolwe/flow/executors/socket_utils.py index 71dde6582..4dc8f2f18 100644 --- a/resolwe/flow/executors/socket_utils.py +++ b/resolwe/flow/executors/socket_utils.py @@ -52,6 +52,16 @@ class PeerStatus(Enum): UNRESPONSIVE = "unresponsive" +class ReceiveStatus(Enum): + """Status of the received method.""" + + OK = "OK" + INVALID_MESSAGE = "invalid" + CANCELLED = "cancelled" + SOCKET_CLOSED = "socket_closed" + TERMINATED = "terminated" + + PeerIdentity = bytes MessageDataType = TypeVar("MessageDataType") ResponseDataType = TypeVar("ResponseDataType") @@ -563,7 +573,9 @@ def call_command(*args): else: return call_command - async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]: + async def _receive_message( + self, + ) -> Tuple[ReceiveStatus, Optional[Tuple[PeerIdentity, Message]]]: """Receive a single message. This method is blocking: it waits for the message to arrive. The @@ -581,6 +593,7 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]: the key "type". """ result = None + received_status = ReceiveStatus.INVALID_MESSAGE try: self.logger.debug("Communicator %s waiting for message.", self.name) receive_task = asyncio.ensure_future(self.receive_method(self.reader)) @@ -592,6 +605,7 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]: try: received = receive_task.result() except asyncio.IncompleteReadError: + received_status = ReceiveStatus.SOCKET_CLOSED self.logger.info("Socket closed by peer, stopping communication.") received = None if received is not None: @@ -601,17 +615,20 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]: received[1]["client_id"] = received[2] assert Message.is_valid(received[1]) result = received[0], Message.from_dict(received[1]) + received_status = ReceiveStatus.OK else: + received_status = ReceiveStatus.TERMINATED self.logger.debug( "Communicator %s _receive_message: terminating flag is set, returning None", self.name, ) - # Do not log cancelled errors. except asyncio.CancelledError: + received_status = ReceiveStatus.CANCELLED self.logger.debug( "Communicator %s: CancelledError in _receive_message.", self.name ) except: + received_status = ReceiveStatus.INVALID_MESSAGE self.logger.exception( "Communicator %s: exception in _receive_message.", self.name ) @@ -619,7 +636,7 @@ async def _receive_message(self) -> Optional[Tuple[PeerIdentity, Message]]: # Always stop both tasks. receive_task.cancel() terminating_task.cancel() - return result + return received_status, result async def _send_message( self, @@ -714,13 +731,17 @@ async def start_listening(self): """ try: while True: - received = await self._receive_message() + status, received = await self._receive_message() if received is None: - self.logger.info( - f"Communicator {self.name}: received empty message, closing communicator." - ) - break + if status in [ + ReceiveStatus.SOCKET_CLOSED, + ReceiveStatus.TERMINATED, + ReceiveStatus.CANCELLED, + ]: + break + elif ReceiveStatus.INVALID_MESSAGE: + continue identity, message = received diff --git a/resolwe/flow/executors/zeromq_utils.py b/resolwe/flow/executors/zeromq_utils.py index 2fa2d1230..d44d87b99 100644 --- a/resolwe/flow/executors/zeromq_utils.py +++ b/resolwe/flow/executors/zeromq_utils.py @@ -2,7 +2,6 @@ import json import os -from contextlib import suppress from logging import Logger from threading import Lock from typing import Any, Optional, Tuple @@ -102,14 +101,3 @@ def instance(cls, context=None): cls._instance = cls(context=context) cls._instance_pid = os.getpid() return cls._instance - - def start(self): - """Ignore possible exception when testing.""" - # The is_testing is not available in the executor so it is imported here. - from resolwe.test.utils import is_testing - - if is_testing(): - with suppress(zmq.error.ZMQError): - super().start() - else: - super().start() diff --git a/resolwe/flow/tests/test_filtering.py b/resolwe/flow/tests/test_filtering.py index 74cb232a5..628a8abca 100644 --- a/resolwe/flow/tests/test_filtering.py +++ b/resolwe/flow/tests/test_filtering.py @@ -70,9 +70,6 @@ def _check_filter( request = factory.get("/", query_args, format="json") force_authenticate(request, user or self.admin) response = self.viewset(request) - print("Got response") - print(response) - print(response.data) if status.is_success(response.status_code): self.assertEqual(len(response.data), len(expected)) diff --git a/resolwe/test_helpers/test_runner.py b/resolwe/test_helpers/test_runner.py index 097cf687d..24c7983cd 100644 --- a/resolwe/test_helpers/test_runner.py +++ b/resolwe/test_helpers/test_runner.py @@ -329,6 +329,8 @@ async def _run_on_infrastructure(meth, *args, **kwargs): logger.exception("Exception while running test") finally: zmq_info.authenticator.stop() + # Make sure authenticator task is stopped. + await asyncio.sleep(0.1) logger.debug("test_runner: Terminating listener")