Skip to content

Commit

Permalink
In-flight request limiting (#132)
Browse files Browse the repository at this point in the history
* Introduce the abstract ConcurrencyLimitedPushkin

* Switch over APNS and GCM to be concurrency limited

* Add logging to tests!

How am I meant to know what's going on otherwise?!

* Introduce the ability to make multiple concurrent requests in tests

* Test concurrency limiting

* Improve readme with alternative configuration option

* Add in-flight request limiting to example config

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <[email protected]>

* Remove forgotten debugging lines
  • Loading branch information
reivilibre authored Jul 16, 2020
1 parent f74ea3b commit ad2ff36
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 18 deletions.
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ via `POST /_matrix/client/r0/pushers/set <https://matrix.org/docs/spec/client_se
Running
=======

``python -m sygnal.sygnal``
With default configuration file name of ``sygnal.yaml``:
``python -m sygnal.sygnal``

With custom configuration file name:
``SYGNAL_CONF=/path/to/custom_sygnal.conf python -m sygnal.sygnal``

Python 3.7 or higher is required.

Expand Down
1 change: 1 addition & 0 deletions changelog.d/132.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for per-pushkin in-flight request limiting.
27 changes: 27 additions & 0 deletions sygnal.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ apps:
#com.example.myapp.ios:
# type: apns
# certfile: com.example.myApp_prod_APNS.pem
#
# # This is the maximum number of in-flight requests *for this pushkin*
# # before additional notifications will be failed.
# # (This is a robustness measure to prevent one pushkin stacking up with
# # queued requests and saturating the inbound connection queue of a load
# # balancer or reverse proxy).
# # Defaults to 512 if unset.
# #
# #inflight_request_limit: 512

# This is an example APNs push configuration using key authentication.
#
Expand All @@ -200,6 +209,15 @@ apps:
# key_id: MY_KEY_ID
# team_id: MY_TEAM_ID
# topic: MY_TOPIC
#
# # This is the maximum number of in-flight requests *for this pushkin*
# # before additional notifications will be failed.
# # (This is a robustness measure to prevent one pushkin stacking up with
# # queued requests and saturating the inbound connection queue of a load
# # balancer or reverse proxy).
# # Defaults to 512 if unset.
# #
# #inflight_request_limit: 512

# This is an example GCM/FCM push configuration.
#
Expand All @@ -209,3 +227,12 @@ apps:
# # This is the maximum number of connections to GCM servers at any one time
# # the default is 20.
# #max_connections: 20
#
# # This is the maximum number of in-flight requests *for this pushkin*
# # before additional notifications will be failed.
# # (This is a robustness measure to prevent one pushkin stacking up with
# # queued requests and saturating the inbound connection queue of a load
# # balancer or reverse proxy).
# # Defaults to 512 if unset.
# #
# #inflight_request_limit: 512
8 changes: 4 additions & 4 deletions sygnal/apnspushkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
PushkinSetupException,
TemporaryNotificationDispatchException,
)
from sygnal.notifications import Pushkin
from sygnal.notifications import ConcurrencyLimitedPushkin
from sygnal.utils import NotificationLoggerAdapter, twisted_sleep

logger = logging.getLogger(__name__)
Expand All @@ -63,7 +63,7 @@
)


class ApnsPushkin(Pushkin):
class ApnsPushkin(ConcurrencyLimitedPushkin):
"""
Relays notifications to the Apple Push Notification Service.
"""
Expand All @@ -86,7 +86,7 @@ class ApnsPushkin(Pushkin):
"key_id",
"keyfile",
"topic",
}
} | ConcurrencyLimitedPushkin.UNDERSTOOD_CONFIG_FIELDS

def __init__(self, name, sygnal, config):
super().__init__(name, sygnal, config)
Expand Down Expand Up @@ -215,7 +215,7 @@ async def _dispatch_request(self, log, span, device, shaved_payload, prio):
f"{response.status} {response.description}"
)

async def dispatch_notification(self, n, device, context):
async def _dispatch_notification_unlimited(self, n, device, context):
log = NotificationLoggerAdapter(logger, {"request_id": context.request_id})

# The pushkey is kind of secret because you can use it to send push
Expand Down
11 changes: 7 additions & 4 deletions sygnal/gcmpushkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from sygnal.utils import NotificationLoggerAdapter, twisted_sleep

from .exceptions import PushkinSetupException
from .notifications import Pushkin
from .notifications import ConcurrencyLimitedPushkin

QUEUE_TIME_HISTOGRAM = Histogram(
"sygnal_gcm_queue_time", "Time taken waiting for a connection to GCM"
Expand Down Expand Up @@ -87,12 +87,15 @@
DEFAULT_MAX_CONNECTIONS = 20


class GcmPushkin(Pushkin):
class GcmPushkin(ConcurrencyLimitedPushkin):
"""
Pushkin that relays notifications to Google/Firebase Cloud Messaging.
"""

UNDERSTOOD_CONFIG_FIELDS = {"type", "api_key"}
UNDERSTOOD_CONFIG_FIELDS = {
"type",
"api_key",
} | ConcurrencyLimitedPushkin.UNDERSTOOD_CONFIG_FIELDS

def __init__(self, name, sygnal, config, canonical_reg_id_store):
super(GcmPushkin, self).__init__(name, sygnal, config)
Expand Down Expand Up @@ -293,7 +296,7 @@ async def _request_dispatch(self, n, log, body, headers, pushkeys, span):
f"Unknown GCM response code {response.code}"
)

async def dispatch_notification(self, n, device, context):
async def _dispatch_notification_unlimited(self, n, device, context):
log = NotificationLoggerAdapter(logger, {"request_id": context.request_id})

pushkeys = [
Expand Down
60 changes: 54 additions & 6 deletions sygnal/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Optional
import typing
from typing import Any, Dict, List, Optional

from .exceptions import InvalidNotificationException
from .exceptions import InvalidNotificationException, NotificationDispatchException

if typing.TYPE_CHECKING:
from .sygnal import Sygnal


class Tweaks:
Expand Down Expand Up @@ -89,17 +93,19 @@ def __init__(self, notif):


class Pushkin(object):
def __init__(self, name, sygnal, config):
def __init__(self, name: str, sygnal: "Sygnal", config: Dict[str, Any]):
self.name = name
self.cfg = config
self.sygnal = sygnal

def get_config(self, key, default=None):
def get_config(self, key: str, default=None):
if key not in self.cfg:
return default
return self.cfg[key]

async def dispatch_notification(self, n, device, context):
async def dispatch_notification(
self, n: Notification, device: Device, context: "NotificationContext"
) -> List[str]:
"""
Args:
n: The notification to dispatch via this pushkin
Expand All @@ -112,7 +118,7 @@ async def dispatch_notification(self, n, device, context):
pass

@classmethod
async def create(cls, name, sygnal, config):
async def create(cls, name: str, sygnal: "Sygnal", config: Dict[str, Any]):
"""
Override this if your pushkin needs to call async code in order to
be constructed. Otherwise, it defaults to just invoking the Python-standard
Expand All @@ -124,6 +130,48 @@ async def create(cls, name, sygnal, config):
return cls(name, sygnal, config)


class ConcurrencyLimitedPushkin(Pushkin):
"""
A subclass of Pushkin that limits the number of in-flight requests at any
one time, so as to prevent one Pushkin pulling the whole show down.
"""

# Maximum in-flight, concurrent notification dispatches that we apply by default
# We start turning away requests after this limit is reached.
DEFAULT_CONCURRENCY_LIMIT = 512

UNDERSTOOD_CONFIG_FIELDS = {"inflight_request_limit"}

def __init__(self, name: str, sygnal: "Sygnal", config: Dict[str, Any]):
super(ConcurrencyLimitedPushkin, self).__init__(name, sygnal, config)
self._concurrent_limit = config.get(
"inflight_request_limit",
ConcurrencyLimitedPushkin.DEFAULT_CONCURRENCY_LIMIT,
)
self._concurrent_now = 0

async def dispatch_notification(
self, n: Notification, device: Device, context: "NotificationContext"
) -> List[str]:
if self._concurrent_now >= self._concurrent_limit:
raise NotificationDispatchException(
"Too many in-flight requests for this pushkin. "
"(Something is wrong and Sygnal is struggling to keep up!)"
)

self._concurrent_now += 1
try:
return await self._dispatch_notification_unlimited(n, device, context)
finally:
self._concurrent_now -= 1

async def _dispatch_notification_unlimited(
self, n: Notification, device: Device, context: "NotificationContext"
) -> List[str]:
# to be overridden by Pushkins!
raise NotImplementedError


class NotificationContext(object):
def __init__(self, request_id, opentracing_span, start_time):
"""
Expand Down
95 changes: 95 additions & 0 deletions tests/test_concurrency_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# Copyright 2019–2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sygnal.notifications import ConcurrencyLimitedPushkin
from sygnal.utils import twisted_sleep

from tests.testutils import TestCase

DEVICE_GCM1_EXAMPLE = {
"app_id": "com.example.gcm",
"pushkey": "spqrg",
"pushkey_ts": 42,
}
DEVICE_GCM2_EXAMPLE = {
"app_id": "com.example.gcm",
"pushkey": "spqrh",
"pushkey_ts": 42,
}
DEVICE_APNS_EXAMPLE = {
"app_id": "com.example.apns",
"pushkey": "spqra",
"pushkey_ts": 42,
}


class SlowConcurrencyLimitedDummyPushkin(ConcurrencyLimitedPushkin):
async def _dispatch_notification_unlimited(self, n, device, context):
"""
We will deliver the notification to the mighty nobody
and we will take one second to do it, because we are slow!
"""
await twisted_sleep(1.0, self.sygnal.reactor)
return []


class ConcurrencyLimitTestCase(TestCase):
def config_setup(self, config):
super(ConcurrencyLimitTestCase, self).config_setup(config)
config["apps"]["com.example.gcm"] = {
"type": "tests.test_concurrency_limit.SlowConcurrencyLimitedDummyPushkin",
"inflight_request_limit": 1,
}
config["apps"]["com.example.apns"] = {
"type": "tests.test_concurrency_limit.SlowConcurrencyLimitedDummyPushkin",
"inflight_request_limit": 1,
}

def test_passes_under_limit_one(self):
"""
Tests that a push notification succeeds if it is under the limit.
"""
resp = self._request(self._make_dummy_notification([DEVICE_GCM1_EXAMPLE]))

self.assertEqual(resp, {"rejected": []})

def test_passes_under_limit_multiple_no_interfere(self):
"""
Tests that 2 push notifications succeed if they are to different
pushkins (so do not hit a per-pushkin limit).
"""
resp = self._request(
self._make_dummy_notification([DEVICE_GCM1_EXAMPLE, DEVICE_APNS_EXAMPLE])
)

self.assertEqual(resp, {"rejected": []})

def test_fails_when_limit_hit(self):
"""
Tests that 1 of 2 push notifications fail if they are to the same pushkins
(so do hit the per-pushkin limit of 1).
"""
resp = self._multi_requests(
[
self._make_dummy_notification([DEVICE_GCM1_EXAMPLE]),
self._make_dummy_notification([DEVICE_GCM2_EXAMPLE]),
]
)

# request 0 will succeed
self.assertEqual(resp[0], {"rejected": []})

# request 1 will fail because request 0 has filled the limit
self.assertEqual(resp[1], 502)
Loading

0 comments on commit ad2ff36

Please sign in to comment.