Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix "db txn 'update_presence' from sentinel context" log messages (#5275
Browse files Browse the repository at this point in the history
)

Fixes #4414.
  • Loading branch information
richvdh authored May 28, 2019
1 parent a97d4e2 commit 5726378
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 52 deletions.
1 change: 1 addition & 0 deletions changelog.d/5275.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix "db txn 'update_presence' from sentinel context" log messages.
99 changes: 47 additions & 52 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,27 @@ def __init__(self, hs):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
def run_timeout_handler():
return run_as_background_process(
"handle_presence_timeouts", self._handle_timeouts
)

self.clock.call_later(
30,
self.clock.looping_call,
self._handle_timeouts,
run_timeout_handler,
5000,
)

def run_persister():
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)

self.clock.call_later(
60,
self.clock.looping_call,
self._persist_unpersisted_changes,
run_persister,
60 * 1000,
)

Expand Down Expand Up @@ -229,6 +239,7 @@ def _on_shutdown(self):
)

if self.unpersisted_users_changes:

yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in self.unpersisted_users_changes
Expand All @@ -240,30 +251,18 @@ def _persist_unpersisted_changes(self):
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
logger.info(
"Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
len(self.unpersisted_users_changes)
)

unpersisted = self.unpersisted_users_changes
self.unpersisted_users_changes = set()

if unpersisted:
logger.info(
"Persisting %d upersisted presence updates", len(unpersisted)
)
yield self.store.update_presence([
self.user_to_current_state[user_id]
for user_id in unpersisted
])

logger.info("Finished _persist_unpersisted_changes")

@defer.inlineCallbacks
def _update_states_and_catch_exception(self, new_states):
try:
res = yield self._update_states(new_states)
defer.returnValue(res)
except Exception:
logger.exception("Error updating presence")

@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
Expand Down Expand Up @@ -338,45 +337,41 @@ def _handle_timeouts(self):
logger.info("Handling presence timeouts")
now = self.clock.time_msec()

try:
with Measure(self.clock, "presence_handle_timeouts"):
# Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to
# take any action.
users_to_check = set(self.wheel_timer.fetch(now))

# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)
# Fetch the list of users that *may* have timed out. Things may have
# changed since the timeout was set, so we won't necessarily have to
# take any action.
users_to_check = set(self.wheel_timer.fetch(now))

# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
process_id for process_id, last_update
in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
users_to_check.update(
self.external_process_last_updated_ms.pop(process_id, ())
)
self.external_process_last_update.pop(process_id)

states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
for user_id in users_to_check
]
states = [
self.user_to_current_state.get(
user_id, UserPresenceState.default(user_id)
)
for user_id in users_to_check
]

timers_fired_counter.inc(len(states))
timers_fired_counter.inc(len(states))

changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
now=now,
)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
syncing_user_ids=self.get_currently_syncing_users(),
now=now,
)

run_in_background(self._update_states_and_catch_exception, changes)
except Exception:
logger.exception("Exception in _handle_timeouts loop")
return self._update_states(changes)

@defer.inlineCallbacks
def bump_presence_active_time(self, user):
Expand Down

0 comments on commit 5726378

Please sign in to comment.