diff --git a/changelog.d/9959.misc b/changelog.d/9959.misc new file mode 100644 index 000000000000..7231f29d79aa --- /dev/null +++ b/changelog.d/9959.misc @@ -0,0 +1 @@ +Add debug logging for lost/delayed to-device messages. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3b053ebcfb08..3a2efd56eea9 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -28,6 +28,7 @@ from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state +from synapse.logging import issue9533_logger from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process @@ -574,6 +575,14 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int] for content in contents ] + if edus: + issue9533_logger.debug( + "Sending %i to-device messages to %s, up to stream id %i", + len(edus), + self._destination, + stream_id, + ) + return (edus, stream_id) def _start_catching_up(self) -> None: diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py index e00969f8b18c..b50a4f95eb3a 100644 --- a/synapse/logging/__init__.py +++ b/synapse/logging/__init__.py @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -# These are imported to allow for nicer logging configuration files. +import logging + from synapse.logging._remote import RemoteHandler from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter +# These are imported to allow for nicer logging configuration files. __all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"] + +# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc +issue9533_logger = logging.getLogger("synapse.9533_debug") diff --git a/synapse/notifier.py b/synapse/notifier.py index b9531007e27c..24b4e6649f18 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -38,6 +38,7 @@ from synapse.api.errors import AuthError from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state +from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span from synapse.logging.utils import log_function @@ -426,6 +427,13 @@ def on_new_event( for room in rooms: user_streams |= self.room_to_user_streams.get(room, set()) + if stream_key == "to_device_key": + issue9533_logger.debug( + "to-device messages stream id %s, awaking streams for %s", + new_token, + users, + ) + time_now_ms = self.clock.time_msec() for user_stream in user_streams: try: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 4f3c6a18b6cf..62d780917500 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -51,7 +51,6 @@ logger = logging.getLogger(__name__) - # How long we allow callers to wait for replication updates before timing out. _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30 diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 7c9d1f744eaf..50e7ddd7355b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -15,6 +15,7 @@ import logging from typing import List, Optional, Tuple +from synapse.logging import issue9533_logger from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.replication.tcp.streams import ToDeviceStream from synapse.storage._base import SQLBaseStore, db_to_json @@ -404,6 +405,13 @@ def add_messages_txn(txn, now_ms, stream_id): ], ) + if remote_messages_by_destination: + issue9533_logger.debug( + "Queued outgoing to-device messages with stream_id %i for %s", + stream_id, + list(remote_messages_by_destination.keys()), + ) + async with self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() await self.db_pool.runInteraction( @@ -533,6 +541,16 @@ def _add_messages_to_local_device_inbox_txn( ], ) + issue9533_logger.debug( + "Stored to-device messages with stream_id %i for %s", + stream_id, + [ + (user_id, device_id) + for (user_id, messages_by_device) in local_by_user_then_device.items() + for device_id in messages_by_device.keys() + ], + ) + class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"