Skip to content

Commit

Permalink
Add extending processing lock task
Browse files Browse the repository at this point in the history
and move locking one level up to simplify the code.
  • Loading branch information
gregorjerse committed Nov 27, 2023
1 parent fb5af6b commit 299589a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Fix
Added
-----
- Add ``extend_lock`` method to redis cache in listener
- Add extending processing lock task to the listener


===================
Expand Down
110 changes: 84 additions & 26 deletions resolwe/flow/managers/listener/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response:
"""
data_id = abs(int(identity))
match cache_manager.wait(Data, [(data_id, message.uuid)]).pop():
case RedisLockStatus.OK:
logger.debug(
__("Message with uuid={} is already being processed.", message.uuid)
)
return message.respond_skip("Message already processed.")
case RedisLockStatus.ERROR:
logger.debug(
__("Message with uuid={} is already being processed.", message.uuid)
)
return message.respond_error("Error processing message.")
case RedisLockStatus.PROCESSING:
return message.respond_skip("Message processing.")

# Lock the message so it is not processed by another worker.
cache_manager.lock(Data, [(data_id, message.uuid)])

# Do not proccess messages from Workers that have already finish
# processing data objects.
Expand All @@ -406,7 +390,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response:
if not self._can_process_object(
worker_status, data_status, message.command_name
):
cache_manager.unlock(Data, [(data_id, message.uuid)])
return message.respond_error(
f"Unable to process the data object {data_id} with status {data_status}."
)
Expand All @@ -416,7 +399,6 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response:
if not handler:
error = f"Unknow command '{message.command_name}'."
self._log_error(self.data(data_id), error, save_to_data_object=False)
cache_manager.unlock(Data, [(data_id, message.uuid)])
return message.respond_error(error)

# Set the data started on the first command.
Expand All @@ -429,23 +411,20 @@ def process_command(self, identity: PeerIdentity, message: Message) -> Response:
# Check if data status was changed by the handler.
if self.get_data_fields(data_id, "status") == Data.STATUS_ERROR:
response.status = ResponseStatus.ERROR
cache_manager.unlock(Data, [(data_id, message.uuid)])
return response
except ValidationError as err:
error = (
f"Validation error when running handler {handler_name} for "
f"Data object with the id {data_id}: {err}."
)
self._log_exception(self.data(data_id), error)
cache_manager.unlock(Data, [(data_id, message.uuid)], RedisLockStatus.ERROR)
return message.respond_error("Validation error")
except Exception as err:
error = (
f"Exception when running handler {handler_name} for data "
f"object with the id {data_id}: {err}."
)
self._log_exception(self.data(data_id), error)
cache_manager.unlock(Data, [(data_id, message.uuid)], RedisLockStatus.ERROR)
return message.respond_error(f"Error in command handler '{handler_name}'.")

def notify_dispatcher_abort(self, data_id: int):
Expand Down Expand Up @@ -630,15 +609,94 @@ async def handle_liveness_probe(
"""Respond to the liveness probe."""
return message.respond_ok(True)

def _handle_lock_message_error(
self, lock_status: RedisLockStatus, received_message: Message
) -> Response:
"""Construct and return the message to be sent."""
message = f"Message {received_message.uuid} "
match lock_status:
case RedisLockStatus.OK:
message += "already processed with OK status."
response_method = received_message.respond_skip
case RedisLockStatus.ERROR:
message += "already processed with error status."
response_method = received_message.respond_error
case RedisLockStatus.PROCESSING:
message += "is processing."
response_method = received_message.respond_skip
logger.debug(message)
return response_method(message)

async def extend_processing_lock(
self,
data_id: int,
message_uuid: str,
refresh_interval: int = 3 * 60,
extend_for: int = 5 * 60,
):
"""Extend the processing lock for the given message.
:attr refresh_interval: extend the key every this many seconds.
:attr extend_for: set the key TTL to this many seconds.
"""
while True:
await asyncio.sleep(refresh_interval)
try:
cache_manager.extend_lock(
Data, [(data_id, message_uuid)], valid_for=extend_for
)
except Exception:
logger.exception("Error extending lock.")

async def default_command_handler(
self, received_message: Message, peer_identity: PeerIdentity
) -> Response:
"""Process command."""
# Executor uses separate identity of the form f"e_{data.id}".
response = await database_sync_to_async(
self._message_processor.process_command, thread_sensitive=False
)(peer_identity, received_message)
"""Process command.
Lock the command before the processing starts and unlock it when it finishes.
"""
response: Optional[Response] = None
extend_lock_task: Optional[asyncio.Task] = None
try:
# Try to lock the specific message for the data object. If locked it means
# the message was/is being processed by some other listener.
# If the listener crashes after successfully obtaining the lock, it will be
# released automatically after five minutes.
# If the processing of the message takes longer than 10 minutes, then same
# message can be processed twice. We can avoid processing the message twice
# but then all processes that have messages in the listener queue will fail
# if the listener chashes.
data_id = abs(int(peer_identity))
success, lock_status = cache_manager.lock(
Data, [(data_id, received_message.uuid)]
)[0]
if not success:
response = self._handle_lock_message_error(
lock_status, received_message
)
else:
extend_lock_task = asyncio.create_task(
self.extend_processing_lock(data_id, received_message.uuid)
)
response = await database_sync_to_async(
self._message_processor.process_command, thread_sensitive=False
)(peer_identity, received_message)
finally:
# Stop the extend lock task.
if extend_lock_task is not None:
extend_lock_task.cancel()
# Something crashed during processing and the response was not created.
# Send a generic error response.
if response is None:
response = received_message.respond_error("Error processing message.")
# Unlock the message.
if response.response_status == ResponseStatus.ERROR:
unlock_status = RedisLockStatus.ERROR
else:
unlock_status = RedisLockStatus.OK
cache_manager.unlock(
Data, [(data_id, received_message.uuid)], status=unlock_status
)
self.logger.debug(__("Response time: {}", received_message.time_elapsed()))
return response

Expand Down

0 comments on commit 299589a

Please sign in to comment.