Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Run things as background processes #3556

Merged
merged 3 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3556.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add metrics to track resource usage by background processes
15 changes: 6 additions & 9 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def notify_new_events(self, current_id):

# fire off a processing loop in the background
run_as_background_process(
"process_transaction_queue",
"process_event_queue_for_federation",
self._process_event_queue_loop,
)

Expand Down Expand Up @@ -434,14 +434,11 @@ def _attempt_new_transaction(self, destination):

logger.debug("TX [%s] Starting transaction loop", destination)

# Drop the logcontext before starting the transaction. It doesn't
# really make sense to log all the outbound transactions against
# whatever path led us to this point: that's pretty arbitrary really.
#
# (this also means we can fire off _perform_transaction without
# yielding)
with logcontext.PreserveLoggingContext():
self._transaction_transmission_loop(destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
destination,
)

@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
Expand Down
10 changes: 8 additions & 2 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process

from . import engines
from ._base import SQLBaseStore

Expand Down Expand Up @@ -87,10 +89,14 @@ def __init__(self, db_conn, hs):
self._background_update_handlers = {}
self._all_done = False

@defer.inlineCallbacks
def start_doing_background_updates(self):
logger.info("Starting background schema updates")
run_as_background_process(
"background_updates", self._run_background_updates,
)

@defer.inlineCallbacks
def _run_background_updates(self):
logger.info("Starting background schema updates")
while True:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
Expand Down
15 changes: 11 additions & 4 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR

from . import background_updates
Expand Down Expand Up @@ -93,10 +94,16 @@ def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
self._batch_row_update[key] = (user_agent, device_id, now)

def _update_client_ips_batch(self):
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)

run_as_background_process(
"update_client_ips", update,
)

def _update_client_ips_batch_txn(self, txn, to_update):
Expand Down
10 changes: 4 additions & 6 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -155,11 +156,8 @@ def handle_queue_loop():
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)

# set handle_queue_loop off on the background. We don't want to
# attribute work done in it to the current request, so we drop the
# logcontext altogether.
with PreserveLoggingContext():
handle_queue_loop()
# set handle_queue_loop off in the background
run_as_background_process("persist_events", handle_queue_loop)

def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
Expand Down
10 changes: 6 additions & 4 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
Expand Down Expand Up @@ -322,10 +323,11 @@ def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
should_start = False

if should_start:
with PreserveLoggingContext():
self.runWithConnection(
self._do_fetch
)
run_as_background_process(
"fetch_events",
self.runWithConnection,
self._do_fetch,
)

logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
Expand Down
6 changes: 5 additions & 1 deletion synapse/util/caches/expiringcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
from collections import OrderedDict

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -63,7 +64,10 @@ def start(self):
return

def f():
self._prune_cache()
run_as_background_process(
"prune_cache_%s" % self._cache_name,
self._prune_cache,
)

self._clock.looping_call(f, self._expiry_ms / 2)

Expand Down
48 changes: 22 additions & 26 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

from twisted.internet import defer

from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background

logger = logging.getLogger(__name__)


def user_left_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id)
distributor.fire("user_left_room", user=user, room_id=room_id)


def user_joined_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id)
distributor.fire("user_joined_room", user=user, room_id=room_id)


class Distributor(object):
Expand All @@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""

def __init__(self, suppress_failures=True):
self.suppress_failures = suppress_failures

def __init__(self):
self.signals = {}
self.pre_registration = {}

Expand All @@ -56,7 +52,6 @@ def declare(self, name):

self.signals[name] = Signal(
name,
suppress_failures=self.suppress_failures,
)

if name in self.pre_registration:
Expand All @@ -75,10 +70,18 @@ def observe(self, name, observer):
self.pre_registration[name].append(observer)

def fire(self, name, *args, **kwargs):
"""Dispatches the given signal to the registered observers.

Runs the observers as a background process. Does not return a deferred.
"""
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))

return self.signals[name].fire(*args, **kwargs)
run_as_background_process(
name,
self.signals[name].fire,
*args, **kwargs
)


class Signal(object):
Expand All @@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""

def __init__(self, name, suppress_failures):
def __init__(self, name):
self.name = name
self.suppress_failures = suppress_failures
self.observers = []

def observe(self, observer):
Expand All @@ -103,7 +105,6 @@ def observe(self, observer):
Each observer callable may return a Deferred."""
self.observers.append(observer)

@defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
Expand All @@ -121,22 +122,17 @@ def eb(failure):
failure.type,
failure.value,
failure.getTracebackObject()))
if not self.suppress_failures:
return failure

return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)

with PreserveLoggingContext():
deferreds = [
do(observer)
for observer in self.observers
]

res = yield defer.gatherResults(
deferreds, consumeErrors=True
).addErrback(unwrapFirstError)
deferreds = [
run_in_background(do, o)
for o in self.observers
]

defer.returnValue(res)
return make_deferred_yieldable(defer.gatherResults(
deferreds, consumeErrors=True,
))

def __repr__(self):
return "<Signal name=%r>" % (self.name,)
56 changes: 4 additions & 52 deletions tests/test_distributor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,8 +16,6 @@

from mock import Mock, patch

from twisted.internet import defer

from synapse.util.distributor import Distributor

from . import unittest
Expand All @@ -27,38 +26,15 @@ class DistributorTestCase(unittest.TestCase):
def setUp(self):
self.dist = Distributor()

@defer.inlineCallbacks
def test_signal_dispatch(self):
self.dist.declare("alert")

observer = Mock()
self.dist.observe("alert", observer)

d = self.dist.fire("alert", 1, 2, 3)
yield d
self.assertTrue(d.called)
self.dist.fire("alert", 1, 2, 3)
observer.assert_called_with(1, 2, 3)

@defer.inlineCallbacks
def test_signal_dispatch_deferred(self):
self.dist.declare("whine")

d_inner = defer.Deferred()

def observer():
return d_inner

self.dist.observe("whine", observer)

d_outer = self.dist.fire("whine")

self.assertFalse(d_outer.called)

d_inner.callback(None)
yield d_outer
self.assertTrue(d_outer.called)

@defer.inlineCallbacks
def test_signal_catch(self):
self.dist.declare("alarm")

Expand All @@ -71,9 +47,7 @@ def test_signal_catch(self):
with patch(
"synapse.util.distributor.logger", spec=["warning"]
) as mock_logger:
d = self.dist.fire("alarm", "Go")
yield d
self.assertTrue(d.called)
self.dist.fire("alarm", "Go")

observers[0].assert_called_once_with("Go")
observers[1].assert_called_once_with("Go")
Expand All @@ -83,34 +57,12 @@ def test_signal_catch(self):
mock_logger.warning.call_args[0][0], str
)

@defer.inlineCallbacks
def test_signal_catch_no_suppress(self):
# Gut-wrenching
self.dist.suppress_failures = False

self.dist.declare("whail")

class MyException(Exception):
pass

@defer.inlineCallbacks
def observer():
raise MyException("Oopsie")

self.dist.observe("whail", observer)

d = self.dist.fire("whail")

yield self.assertFailure(d, MyException)
self.dist.suppress_failures = True

@defer.inlineCallbacks
def test_signal_prereg(self):
observer = Mock()
self.dist.observe("flare", observer)

self.dist.declare("flare")
yield self.dist.fire("flare", 4, 5)
self.dist.fire("flare", 4, 5)

observer.assert_called_with(4, 5)

Expand Down