Skip to content

Commit

Permalink
Fix retry-queue's enqueue() blocked by semaphore
Browse files Browse the repository at this point in the history
Before, the retry-queues semaphore had to be acquired in order to
enqueue messages.
This caused the thought to be asynchronous enqueue() method and
all methods calling it to be dependent on successfully processing
all messages in the queues greenlet that processes the queue.

Effectively this resulted in a synchronous enqueue() call and could
cause the state-machine to block during long-running requests or retries
to the transport layer.

Fixes: raiden-network#7120
  • Loading branch information
ezdac committed Jun 3, 2021
1 parent b27ff2b commit dbaea47
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 78 deletions.
149 changes: 72 additions & 77 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ class MessagesQueue:
messages: List[Tuple[Message, Optional[AddressMetadata]]]


def metadata_key_func(message_data: "_RetryQueue._MessageData") -> str:
address_metadata = message_data.address_metadata
if address_metadata is None:
return ""
uid = address_metadata.get("user_id", "")
return uid


class _RetryQueue(Runnable):
"""A helper Runnable to send batched messages to receiver through transport"""

Expand All @@ -132,6 +140,9 @@ def __init__(self, transport: "MatrixTransport", receiver: Address) -> None:
self.receiver = receiver
self._message_queue: List[_RetryQueue._MessageData] = list()
self._notify_event = gevent.event.Event()
# The lock is only used internally in the thread that consumes the message queue,
# so that there are no concurrent _check_and_send calls.
# It should never be aquired in any call-stack that is publicly accessible!
self._lock = Semaphore()
self._idle_since: int = 0 # Counter of idle iterations
super().__init__()
Expand Down Expand Up @@ -171,40 +182,38 @@ def enqueue(
)
assert queue_identifier.recipient == self.receiver, msg

with self._lock:
timeout_generator = timeout_exponential_backoff(
self.transport._config.retries_before_backoff,
self.transport._config.retry_interval_initial,
self.transport._config.retry_interval_max,
timeout_generator = timeout_exponential_backoff(
self.transport._config.retries_before_backoff,
self.transport._config.retry_interval_initial,
self.transport._config.retry_interval_max,
)

encoded_messages = list()
for message, address_metadata in messages:
already_queued = any(
queue_identifier == data.queue_identifier and message == data.message
for data in self._message_queue
)

encoded_messages = list()
for message, address_metadata in messages:
already_queued = any(
queue_identifier == data.queue_identifier and message == data.message
for data in self._message_queue
if already_queued:
self.log.warning(
"Message already in queue - ignoring",
receiver=to_checksum_address(self.receiver),
queue=queue_identifier,
message=redact_secret(DictSerializer.serialize(message)),
)
else:
expiration_generator = self._expiration_generator(timeout_generator)
data = _RetryQueue._MessageData(
queue_identifier=queue_identifier,
message=message,
text=MessageSerializer.serialize(message),
expiration_generator=expiration_generator,
address_metadata=address_metadata,
)
encoded_messages.append(data)

if already_queued:
self.log.warning(
"Message already in queue - ignoring",
receiver=to_checksum_address(self.receiver),
queue=queue_identifier,
message=redact_secret(DictSerializer.serialize(message)),
)
else:
expiration_generator = self._expiration_generator(timeout_generator)
data = _RetryQueue._MessageData(
queue_identifier=queue_identifier,
message=message,
text=MessageSerializer.serialize(message),
expiration_generator=expiration_generator,
address_metadata=address_metadata,
)
encoded_messages.append(data)

self._message_queue.extend(encoded_messages)

self._message_queue.extend(encoded_messages)
self.notify()

def enqueue_unordered(
Expand All @@ -220,43 +229,9 @@ def enqueue_unordered(

def notify(self) -> None:
"""Notify main loop to check if anything needs to be sent"""
with self._lock:
self._notify_event.set()

def _batch_by_user_id(
self,
) -> List[Tuple[List[_MessageData], Optional[AddressMetadata]]]:
"""
This method will batch the message data in the retry-queue by the UserID
that might be attached to the message-data by additional AddressMetadata.
The method will return batches of message-data, batched by the value of the "user_id"
key in the message data's address-metadata.
If there is no address-metadata present, or if the address-metadata does not have a
"user_id" key, this message-data will be batched to the `None` batch.
"""
self._notify_event.set()

def key_func(message_data: "_RetryQueue._MessageData") -> str:
address_metadata = message_data.address_metadata
if address_metadata is None:
return ""
uid = address_metadata.get("user_id", "")
return uid

batched_messages = list()
queue_by_user_id = sorted(self._message_queue[:], key=key_func)
for user_id, batch in itertools.groupby(queue_by_user_id, key_func):
message_data_batch = list(batch)
if user_id == "":
metadata = None
else:
# simply use the first metadata in the list, event though
# there could be discrepancies along the batch
metadata = message_data_batch[0].address_metadata
batched_messages.append((message_data_batch, metadata))
return batched_messages

def _check_and_send(self) -> None:
def _check_and_send(self, messages: "List[_RetryQueue._MessageData]") -> None:
"""Check and send all pending/queued messages that are not waiting on retry timeout
After composing the to-be-sent message, also message queue from messages that are not
Expand Down Expand Up @@ -294,9 +269,19 @@ def message_is_in_queue(message_data: _RetryQueue._MessageData) -> bool:
for send_event in self.transport._queueids_to_queues[message_data.queue_identifier]
)

for subqueue, address_metadata in self._batch_by_user_id():
# batch by user_id, so that we can potentially combine data for send-to-device calls
queue_by_user_id = sorted(messages[:], key=metadata_key_func)
for user_id, batch in itertools.groupby(queue_by_user_id, metadata_key_func):
message_data_batch = list(batch)
if user_id == "":
address_metadata = None
else:
# simply use the first metadata in the list, event though
# there could be discrepancies along the batch
address_metadata = message_data_batch[0].address_metadata

message_texts: List[str] = list()
for message_data in subqueue:
for message_data in message_data_batch:
# Messages are sent on two conditions:
# - Non-retryable (e.g. Delivered)
# - Those are immediately remove from the local queue
Expand Down Expand Up @@ -333,9 +318,13 @@ def message_is_in_queue(message_data: _RetryQueue._MessageData) -> bool:
message_data.message.message_identifier,
)
] += 1

if remove:
self._message_queue.remove(message_data)
messages.remove(message_data)

for message in messages:
# prepend messages to be retried to the message-queue again in the original order
# so that it can be eventually retried at a later point
self._message_queue.insert(0, message)

if message_texts:
self.log.debug(
Expand All @@ -361,13 +350,19 @@ def _run(self) -> None: # type: ignore
# run while transport parent is running
while not self.transport._stop_event.ready():
# once entered the critical section, block any other enqueue or notify attempt
with self._lock:
self._notify_event.clear()
if self._message_queue:
self._idle_since = 0
self._check_and_send()
else:
self._idle_since += 1

self._notify_event.clear()
if self._message_queue:
self._idle_since = 0
# Shallow copy the current messages in the queue
# to be processed for this _check_and_send iteration
messages_current_workload = self._message_queue[:]
# then empty the message_queue
self._message_queue.clear()
with self._lock:
self._check_and_send(messages_current_workload)
else:
self._idle_since += 1

if self.is_idle:
# There have been no messages to process for a while. Exit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_client,
make_room_alias,
)

from raiden.tests.utils import factories
from raiden.tests.utils.detect_failure import raise_on_failure
from raiden.tests.utils.transport import ignore_messages, new_client
Expand Down

0 comments on commit dbaea47

Please sign in to comment.