diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index 100dfecc6..ac15f5d5c 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -19,6 +19,7 @@ Fix Added ----- - Add ``extend_lock`` method to redis cache in listener +- Add extending processing lock task to the listener =================== diff --git a/resolwe/flow/managers/listener/listener.py b/resolwe/flow/managers/listener/listener.py index 6c3612f2d..d3c9645d6 100644 --- a/resolwe/flow/managers/listener/listener.py +++ b/resolwe/flow/managers/listener/listener.py @@ -381,22 +381,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: """ data_id = abs(int(identity)) - match cache_manager.wait(Data, [(data_id, message.uuid)]).pop(): - case RedisLockStatus.OK: - logger.debug( - __("Message with uuid={} is already being processed.", message.uuid) - ) - return message.respond_skip("Message already processed.") - case RedisLockStatus.ERROR: - logger.debug( - __("Message with uuid={} is already being processed.", message.uuid) - ) - return message.respond_error("Error processing message.") - case RedisLockStatus.PROCESSING: - return message.respond_skip("Message processing.") - - # Lock the message so it is not processed by another worker. - cache_manager.lock(Data, [(data_id, message.uuid)]) # Do not proccess messages from Workers that have already finish # processing data objects. @@ -406,7 +390,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: if not self._can_process_object( worker_status, data_status, message.command_name ): - cache_manager.unlock(Data, [(data_id, message.uuid)]) return message.respond_error( f"Unable to process the data object {data_id} with status {data_status}." ) @@ -416,7 +399,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: if not handler: error = f"Unknow command '{message.command_name}'." self._log_error(self.data(data_id), error, save_to_data_object=False) - cache_manager.unlock(Data, [(data_id, message.uuid)]) return message.respond_error(error) # Set the data started on the first command. @@ -429,7 +411,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: # Check if data status was changed by the handler. if self.get_data_fields(data_id, "status") == Data.STATUS_ERROR: response.status = ResponseStatus.ERROR - cache_manager.unlock(Data, [(data_id, message.uuid)]) return response except ValidationError as err: error = ( @@ -437,7 +418,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: f"Data object with the id {data_id}: {err}." ) self._log_exception(self.data(data_id), error) - cache_manager.unlock(Data, [(data_id, message.uuid)], RedisLockStatus.ERROR) return message.respond_error("Validation error") except Exception as err: error = ( @@ -445,7 +425,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response: f"object with the id {data_id}: {err}." ) self._log_exception(self.data(data_id), error) - cache_manager.unlock(Data, [(data_id, message.uuid)], RedisLockStatus.ERROR) return message.respond_error(f"Error in command handler '{handler_name}'.") def notify_dispatcher_abort(self, data_id: int): @@ -630,15 +609,94 @@ async def handle_liveness_probe( """Respond to the liveness probe.""" return message.respond_ok(True) + def _handle_lock_message_error( + self, lock_status: RedisLockStatus, received_message: Message + ) -> Response: + """Construct and return the message to be sent.""" + message = f"Message {received_message.uuid} " + match lock_status: + case RedisLockStatus.OK: + message += "already processed with OK status." + response_method = received_message.respond_skip + case RedisLockStatus.ERROR: + message += "already processed with error status." + response_method = received_message.respond_error + case RedisLockStatus.PROCESSING: + message += "is processing." + response_method = received_message.respond_skip + logger.debug(message) + return response_method(message) + + async def extend_processing_lock( + self, + data_id: int, + message_uuid: str, + refresh_interval: int = 3 * 60, + extend_for: int = 5 * 60, + ): + """Extend the processing lock for the given message. + + :attr refresh_interval: extend the key every this many seconds. + :attr extend_for: set the key TTL to this many seconds. + """ + while True: + await asyncio.sleep(refresh_interval) + try: + cache_manager.extend_lock( + Data, [(data_id, message_uuid)], valid_for=extend_for + ) + except Exception: + logger.exception("Error extending lock.") + async def default_command_handler( self, received_message: Message, peer_identity: PeerIdentity ) -> Response: - """Process command.""" - # Executor uses separate identity of the form f"e_{data.id}". - response = await database_sync_to_async( - self._message_processor.process_command, thread_sensitive=False - )(peer_identity, received_message) + """Process command. + Lock the command before the processing starts and unlock it when it finishes. + """ + response: Optional[Response] = None + extend_lock_task: Optional[asyncio.Task] = None + try: + # Try to lock the specific message for the data object. If locked it means + # the message was/is being processed by some other listener. + # If the listener crashes after successfully obtaining the lock, it will be + # released automatically after five minutes. + # If the processing of the message takes longer than 10 minutes, then same + # message can be processed twice. We can avoid processing the message twice + # but then all processes that have messages in the listener queue will fail + # if the listener chashes. + data_id = abs(int(peer_identity)) + success, lock_status = cache_manager.lock( + Data, [(data_id, received_message.uuid)] + )[0] + if not success: + response = self._handle_lock_message_error( + lock_status, received_message + ) + else: + extend_lock_task = asyncio.create_task( + self.extend_processing_lock(data_id, received_message.uuid) + ) + response = await database_sync_to_async( + self._message_processor.process_command, thread_sensitive=False + )(peer_identity, received_message) + finally: + # Stop the extend lock task. + if extend_lock_task is not None: + extend_lock_task.cancel() + # Something crashed during processing and the response was not created. + # Send a generic error response. + if response is None: + response = received_message.respond_error("Error processing message.") + # Unlock the message. + if response.response_status == ResponseStatus.ERROR: + unlock_status = RedisLockStatus.ERROR + else: + unlock_status = RedisLockStatus.OK + cache_manager.unlock( + Data, [(data_id, received_message.uuid)], status=unlock_status + ) self.logger.debug(__("Response time: {}", received_message.time_elapsed())) return response