Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve tracing for to device messages #9686

Merged
merged 7 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
Expand Down Expand Up @@ -557,6 +558,13 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
for content in contents:
message_id = content.get("message_id")
if not message_id:
continue

set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

edus = [
Edu(
origin=self._server_name,
Expand Down
35 changes: 20 additions & 15 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
SynapseTags,
get_active_span_text_map,
log_kv,
set_tag,
start_active_span,
)
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
Expand Down Expand Up @@ -182,7 +182,10 @@ async def send_device_message(
) -> None:
sender_user_id = requester.user.to_string()

set_tag("number_of_messages", len(messages))
message_id = random_string(16)
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

log_kv({"number_of_to_device_messages": len(messages)})
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
Expand All @@ -204,32 +207,35 @@ async def send_device_message(
"content": message_content,
"type": message_type,
"sender": sender_user_id,
"message_id": message_id,
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
log_kv(
{
"user_id": user_id,
"device_id": list(messages_by_device),
}
)
else:
destination = get_domain_from_id(user_id)
remote_messages.setdefault(destination, {})[user_id] = by_device

message_id = random_string(16)

context = get_active_span_text_map()

remote_edu_contents = {}
for destination, messages in remote_messages.items():
with start_active_span("to_device_for_user"):
set_tag("destination", destination)
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
}
log_kv({"destination": destination})
remote_edu_contents[destination] = {
"messages": messages,
"sender": sender_user_id,
"type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json_encoder.encode(context),
}

log_kv({"local_messages": local_messages})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the new log_kv({user_id, device_id}) above is meant to be the replacement for this log_kv and the one below. Is that correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. This log was a bit useless as it gave the entire message, which then got truncated by jaeger, so you couldn't really see what was going on

stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
Expand All @@ -238,7 +244,6 @@ async def send_device_message(
"to_device_key", stream_id, users=local_messages.keys()
)

log_kv({"remote_messages": remote_messages})
if self.federation_sender:
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
Expand Down
16 changes: 15 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
Expand Down Expand Up @@ -340,7 +341,14 @@ async def current_sync_for_user(
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
return await self.generate_sync_result(sync_config, since_token, full_state)
with start_active_span("current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
)

set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result

async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
Expand Down Expand Up @@ -964,6 +972,7 @@ async def generate_sync_result(
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})

logger.debug(
"Calculating sync response for %r between %s and %s",
Expand Down Expand Up @@ -1225,6 +1234,11 @@ async def _generate_sync_entry_for_to_device(
user_id, device_id, since_stream_id, now_token.to_device_key
)

for message in messages:
message_id = message.get("message_id", None)
if message_id:
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)

logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages),
Expand Down
8 changes: 8 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ def report_span(self, span):
logger = logging.getLogger(__name__)


class SynapseTags:
# The message ID of any to_device message processed
TO_DEVICE_MESSAGE_ID = "to_device.message_id"

# Whether the sync response has new data to be returned to the client.
SYNC_RESULT = "sync.new_data"
Comment on lines +266 to +267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of this to check whether a to_device message came down /sync for a user? If so, then won't events other than to_device messages lower the signal-to-noise ratio?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ish, I was more interested in signalling whether we'd found anything or not. I might go in and add per-section flags later, but I'm a bit wary of adding toooo much at once



# Block everything by default
# A regex which matches the server_names to expose traces for.
# None means 'block everything'.
Expand Down
45 changes: 43 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.streams.config import PaginationConfig
Expand Down Expand Up @@ -136,6 +137,15 @@ def notify(
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred

log_kv(
{
"notify": self.user_id,
"stream": stream_key,
"stream_id": stream_id,
"listeners": self.count_listeners(),
}
)

users_woken_by_stream_counter.labels(stream_key).inc()

with PreserveLoggingContext():
Expand Down Expand Up @@ -404,6 +414,13 @@ def on_new_event(
with Measure(self.clock, "on_new_event"):
user_streams = set()

log_kv(
{
"waking_up_explicit_users": len(users),
"waking_up_explicit_rooms": len(rooms),
}
)

for user in users:
user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None:
Expand Down Expand Up @@ -476,21 +493,45 @@ async def wait_for_events(
(end_time - now) / 1000.0,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
await listener.deferred

with start_active_span("wait_for_events.deferred"):
log_kv(
{
"wait_for_events": "sleep",
"token": prev_token,
}
)

with PreserveLoggingContext():
await listener.deferred

log_kv(
{
"wait_for_events": "woken",
"token": user_stream.current_token,
}
)

current_token = user_stream.current_token

result = await callback(prev_token, current_token)
log_kv(
{
"wait_for_events": "result",
"result": bool(result),
}
)
if result:
break

# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except defer.TimeoutError:
log_kv({"wait_for_events": "timeout"})
break
except defer.CancelledError:
log_kv({"wait_for_events": "cancelled"})
break

if result is None:
Expand Down