diff --git a/changelog.d/9686.misc b/changelog.d/9686.misc new file mode 100644 index 000000000000..bb2335acf906 --- /dev/null +++ b/changelog.d/9686.misc @@ -0,0 +1 @@ +Improve Jaeger tracing for `to_device` messages. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 89df9a619b74..e9c8a9f20a66 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -29,6 +29,7 @@ from synapse.events import EventBase from synapse.federation.units import Edu from synapse.handlers.presence import format_user_presence_state +from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ReadReceipt @@ -557,6 +558,13 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int] contents, stream_id = await self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, limit ) + for content in contents: + message_id = content.get("message_id") + if not message_id: + continue + + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + edus = [ Edu( origin=self._server_name, diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 5ee48be6ffa7..c971eeb4d26c 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -21,10 +21,10 @@ from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( + SynapseTags, get_active_span_text_map, log_kv, set_tag, - start_active_span, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import JsonDict, Requester, UserID, get_domain_from_id @@ -183,7 +183,10 @@ async def send_device_message( ) -> None: sender_user_id = requester.user.to_string() - set_tag("number_of_messages", len(messages)) + message_id = random_string(16) + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + + log_kv({"number_of_to_device_messages": len(messages)}) set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]] @@ -205,32 +208,35 @@ async def send_device_message( "content": message_content, "type": message_type, "sender": sender_user_id, + "message_id": message_id, } for device_id, message_content in by_device.items() } if messages_by_device: local_messages[user_id] = messages_by_device + log_kv( + { + "user_id": user_id, + "device_id": list(messages_by_device), + } + ) else: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device - message_id = random_string(16) - context = get_active_span_text_map() remote_edu_contents = {} for destination, messages in remote_messages.items(): - with start_active_span("to_device_for_user"): - set_tag("destination", destination) - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - "org.matrix.opentracing_context": json_encoder.encode(context), - } + log_kv({"destination": destination}) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "org.matrix.opentracing_context": json_encoder.encode(context), + } - log_kv({"local_messages": local_messages}) stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -239,7 +245,6 @@ async def send_device_message( "to_device_key", stream_id, users=local_messages.keys() ) - log_kv({"remote_messages": remote_messages}) if self.federation_sender: for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee607e6e6576..7b356ba7e5a8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -24,6 +24,7 @@ from synapse.api.filtering import FilterCollection from synapse.events import EventBase from synapse.logging.context import current_context +from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter @@ -340,7 +341,14 @@ async def current_sync_for_user( full_state: bool = False, ) -> SyncResult: """Get the sync for client needed to match what the server has now.""" - return await self.generate_sync_result(sync_config, since_token, full_state) + with start_active_span("current_sync_for_user"): + log_kv({"since_token": since_token}) + sync_result = await self.generate_sync_result( + sync_config, since_token, full_state + ) + + set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) + return sync_result async def push_rules_for_user(self, user: UserID) -> JsonDict: user_id = user.to_string() @@ -964,6 +972,7 @@ async def generate_sync_result( # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = self.event_sources.get_current_token() + log_kv({"now_token": now_token}) logger.debug( "Calculating sync response for %r between %s and %s", @@ -1225,6 +1234,13 @@ async def _generate_sync_entry_for_to_device( user_id, device_id, since_stream_id, now_token.to_device_key ) + for message in messages: + # We pop here as we shouldn't be sending the message ID down + # `/sync` + message_id = message.pop("message_id", None) + if message_id: + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", len(messages), diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 096d199f4cf1..bb35af099d7a 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -19,7 +19,10 @@ from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.replication.tcp.streams import TypingStream from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -86,6 +89,7 @@ def _reset(self) -> None: self._member_last_federation_poke = {} self.wheel_timer = WheelTimer(bucket_size=5000) + @wrap_as_background_process("typing._handle_timeouts") def _handle_timeouts(self) -> None: logger.debug("Checking for typing timeouts") diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index aa146e8bb8b2..b8081f197e87 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -259,6 +259,14 @@ def report_span(self, span): logger = logging.getLogger(__name__) +class SynapseTags: + # The message ID of any to_device message processed + TO_DEVICE_MESSAGE_ID = "to_device.message_id" + + # Whether the sync response has new data to be returned to the client. + SYNC_RESULT = "sync.new_data" + + # Block everything by default # A regex which matches the server_names to expose traces for. # None means 'block everything'. diff --git a/synapse/notifier.py b/synapse/notifier.py index 1374aae49051..d35c1f3f02e5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -39,6 +39,7 @@ from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext +from synapse.logging.opentracing import log_kv, start_active_span from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig @@ -136,6 +137,15 @@ def notify( self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred + log_kv( + { + "notify": self.user_id, + "stream": stream_key, + "stream_id": stream_id, + "listeners": self.count_listeners(), + } + ) + users_woken_by_stream_counter.labels(stream_key).inc() with PreserveLoggingContext(): @@ -404,6 +414,13 @@ def on_new_event( with Measure(self.clock, "on_new_event"): user_streams = set() + log_kv( + { + "waking_up_explicit_users": len(users), + "waking_up_explicit_rooms": len(rooms), + } + ) + for user in users: user_stream = self.user_to_user_stream.get(str(user)) if user_stream is not None: @@ -476,12 +493,34 @@ async def wait_for_events( (end_time - now) / 1000.0, self.hs.get_reactor(), ) - with PreserveLoggingContext(): - await listener.deferred + + with start_active_span("wait_for_events.deferred"): + log_kv( + { + "wait_for_events": "sleep", + "token": prev_token, + } + ) + + with PreserveLoggingContext(): + await listener.deferred + + log_kv( + { + "wait_for_events": "woken", + "token": user_stream.current_token, + } + ) current_token = user_stream.current_token result = await callback(prev_token, current_token) + log_kv( + { + "wait_for_events": "result", + "result": bool(result), + } + ) if result: break @@ -489,8 +528,10 @@ async def wait_for_events( # has happened between the old prev_token and the current_token prev_token = current_token except defer.TimeoutError: + log_kv({"wait_for_events": "timeout"}) break except defer.CancelledError: + log_kv({"wait_for_events": "cancelled"}) break if result is None: