Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix "db txn 'update_presence' from sentinel context" log messages #5275

Merged
merged 2 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5275.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix "db txn 'update_presence' from sentinel context" log messages.
99 changes: 47 additions & 52 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,27 @@ def __init__(self, hs):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
def run_timeout_handler():
return run_as_background_process(
"handle_presence_timeouts", self._handle_timeouts
)

self.clock.call_later(
30,
self.clock.looping_call,
self._handle_timeouts,
run_timeout_handler,
5000,
)

def run_persister():
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)

self.clock.call_later(
60,
self.clock.looping_call,
self._persist_unpersisted_changes,
run_persister,
60 * 1000,
)

Expand Down Expand Up @@ -229,6 +239,7 @@ def _on_shutdown(self):
)

if self.unpersisted_users_changes:

yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
Expand All @@ -240,30 +251,18 @@ def _persist_unpersisted_changes(self):
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
logger.info(
"Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
len(self.unpersisted_users_changes)
)

unpersisted = self.unpersisted_users_changes
self.unpersisted_users_changes = set()

if unpersisted:
logger.info(
"Persisting %d upersisted presence updates", len(unpersisted)
)
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])

logger.info("Finished _persist_unpersisted_changes")

@defer.inlineCallbacks
def _update_states_and_catch_exception(self, new_states):
try:
res = yield self._update_states(new_states)
defer.returnValue(res)
except Exception:
logger.exception("Error updating presence")

@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
Expand Down Expand Up @@ -338,45 +337,41 @@ def _handle_timeouts(self):
logger.info("Handling presence timeouts")
now = self.clock.time_msec()

try:
with Measure(self.clock, "presence_handle_timeouts"):
# Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to
# take any action.
users_to_check = set(self.wheel_timer.fetch(now))

# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
# Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to
# take any action.
users_to_check = set(self.wheel_timer.fetch(now))

# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)

states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
for user_id in users_to_check
]
states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
for user_id in users_to_check
]

timers_fired_counter.inc(len(states))
timers_fired_counter.inc(len(states))

changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
now=now,
)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
now=now,
)

run_in_background(self._update_states_and_catch_exception, changes)
except Exception:
logger.exception("Exception in _handle_timeouts loop")
return self._update_states(changes)

@defer.inlineCallbacks
def bump_presence_active_time(self, user):
Expand Down