diff --git a/changelog.d/5275.bugfix b/changelog.d/5275.bugfix new file mode 100644 index 000000000000..45a554642a53 --- /dev/null +++ b/changelog.d/5275.bugfix @@ -0,0 +1 @@ +Fix "db txn 'update_presence' from sentinel context" log messages. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 59d53f105059..6209858bbb9a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -182,17 +182,27 @@ def __init__(self, hs): # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. + def run_timeout_handler(): + return run_as_background_process( + "handle_presence_timeouts", self._handle_timeouts + ) + self.clock.call_later( 30, self.clock.looping_call, - self._handle_timeouts, + run_timeout_handler, 5000, ) + def run_persister(): + return run_as_background_process( + "persist_presence_changes", self._persist_unpersisted_changes + ) + self.clock.call_later( 60, self.clock.looping_call, - self._persist_unpersisted_changes, + run_persister, 60 * 1000, ) @@ -229,6 +239,7 @@ def _on_shutdown(self): ) if self.unpersisted_users_changes: + yield self.store.update_presence([ self.user_to_current_state[user_id] for user_id in self.unpersisted_users_changes @@ -240,30 +251,18 @@ def _persist_unpersisted_changes(self): """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ - logger.info( - "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes", - len(self.unpersisted_users_changes) - ) - unpersisted = self.unpersisted_users_changes self.unpersisted_users_changes = set() if unpersisted: + logger.info( + "Persisting %d upersisted presence updates", len(unpersisted) + ) yield self.store.update_presence([ self.user_to_current_state[user_id] for user_id in unpersisted ]) - logger.info("Finished _persist_unpersisted_changes") - - @defer.inlineCallbacks - def _update_states_and_catch_exception(self, new_states): - try: - res = yield self._update_states(new_states) - defer.returnValue(res) - except Exception: - logger.exception("Error updating presence") - @defer.inlineCallbacks def _update_states(self, new_states): """Updates presence of users. Sets the appropriate timeouts. Pokes @@ -338,45 +337,41 @@ def _handle_timeouts(self): logger.info("Handling presence timeouts") now = self.clock.time_msec() - try: - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) - - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_updated_ms.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_last_updated_ms.pop(process_id, ()) - ) - self.external_process_last_update.pop(process_id) + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] - timers_fired_counter.inc(len(states)) + timers_fired_counter.inc(len(states)) - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_user_ids=self.get_currently_syncing_users(), - now=now, - ) + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, + ) - run_in_background(self._update_states_and_catch_exception, changes) - except Exception: - logger.exception("Exception in _handle_timeouts loop") + return self._update_states(changes) @defer.inlineCallbacks def bump_presence_active_time(self, user):