Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: reset UAID if too many messages are pending
Browse files Browse the repository at this point in the history
closes #473
  • Loading branch information
jrconlin committed Nov 8, 2016
1 parent 04756c4 commit 461a866
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 0 deletions.
5 changes: 5 additions & 0 deletions autopush/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ class APNSException(Exception):
pass


class MessageOverloadException(Exception):
"""Too many messages per UAID"""
pass


class RouterException(AutopushException):
"""Exception if routing has failed, may include a custom status_code and
body to write to the response.
Expand Down
4 changes: 4 additions & 0 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def add_shared_args(parser):
env_var="HUMAN_LOGS")
parser.add_argument('--no_aws', help="Skip AWS meta information checks",
action="store_true", default=False)
parser.add_argument('--msg_limit', help="Max limit for messages per uaid "
"before reset", type=int, default="100",
env_var="MSG_LIMIT")
# No ENV because this is for humans
add_external_router_args(parser)
obsolete_args(parser)
Expand Down Expand Up @@ -429,6 +432,7 @@ def make_settings(args, **kwargs):
wake_timeout=args.wake_timeout,
ami_id=ami_id,
client_certs=client_certs,
msg_limit=args.msg_limit,
**kwargs
)

Expand Down
2 changes: 2 additions & 0 deletions autopush/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def __init__(self,
preflight_uaid="deadbeef00000000deadbeef00000000",
ami_id=None,
client_certs=None,
msg_limit=100,
):
"""Initialize the Settings object
Expand Down Expand Up @@ -170,6 +171,7 @@ def __init__(self,
message_read_throughput=message_read_throughput,
message_write_throughput=message_write_throughput)
self._message_prefix = message_tablename
self.message_limit = msg_limit
self.storage = Storage(self.storage_table, self.metrics)
self.router = Router(self.router_table, self.metrics)

Expand Down
1 change: 1 addition & 0 deletions autopush/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ class TestArg:
fcm_auth = 'abcde'
ssl_key = "keys/server.crt"
ssl_cert = "keys/server.key"
msg_limit = 1000
_client_certs = dict(partner1=["1A:"*31 + "F9"],
partner2=["2B:"*31 + "E8",
"3C:"*31 + "D7"])
Expand Down
33 changes: 33 additions & 0 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import uuid
from hashlib import sha256
from collections import defaultdict

import twisted.internet.base
from autopush.tests.test_db import make_webpush_notification
Expand Down Expand Up @@ -1945,6 +1946,38 @@ def test_notif_finished_with_webpush_with_old_notifications(self):
ok_(self.proto.force_retry.called)
ok_(not self.send_mock.called)

def test_notif_finished_with_too_many_messages(self):
self._connect()
self.proto.ps.uaid = uuid.uuid4().hex
self.proto.ps.use_webpush = True
self.proto.ps._check_notifications = True
self.proto.ps.msg_limit = 2
self.proto.ap_settings.router.drop_user = Mock()
self.proto.ps.message.fetch_messages = Mock()

notif = make_webpush_notification(
self.proto.ps.uaid,
dummy_chid_str,
ttl=500
)
self.proto.ps.updates_sent = defaultdict(lambda: [])
self.proto.ps.message.fetch_messages.return_value = (
None,
[notif, notif, notif]
)

d = Deferred()

def check(*args, **kwargs):
ok_(self.proto.ap_settings.router.drop_user.called)
ok_(self.send_mock.called)
d.callback(True)

self.proto.force_retry = Mock()
self.proto.process_notifications()
self.proto.ps._notification_fetch.addBoth(check)
return d

def test_notification_results(self):
# Populate the database for ourself
uaid = uuid.uuid4().hex
Expand Down
18 changes: 18 additions & 0 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
dump_uaid,
)
from autopush.db import Message # noqa
from autopush.exceptions import MessageOverloadException
from autopush.noseplugin import track_object
from autopush.protocol import IgnoreBody
from autopush.utils import (
Expand Down Expand Up @@ -167,6 +168,8 @@ class PushState(object):
'updates_sent',
'direct_updates',

'msg_limit',

# iProducer methods
'pauseProducing',
'resumeProducing',
Expand Down Expand Up @@ -215,6 +218,7 @@ def __init__(self, settings, request):

self._check_notifications = False
self._more_notifications = False
self.msg_limit = settings.message_limit

# Timestamp message defaults
self.scan_timestamps = False
Expand Down Expand Up @@ -262,6 +266,7 @@ class PushServerProtocol(WebSocketServerProtocol, policies.TimeoutMixin):
parent_class = WebSocketServerProtocol
randrange = randrange
_log_exc = True
sent_notification_count = 0

# Defer helpers
def deferToThread(self, func, *args, **kwargs):
Expand Down Expand Up @@ -871,6 +876,9 @@ def process_notifications(self):
d.addCallback(self.finish_notifications)
d.addErrback(self.error_notification_overload)
d.addErrback(self.trap_cancel)
d.addErrback(self.error_message_overload)
# The following errback closes the connection. It must be the last
# errback in the chain.
d.addErrback(self.error_notifications)
self.ps._notification_fetch = d

Expand All @@ -896,6 +904,12 @@ def error_notification_overload(self, fail):
d = self.deferToLater(randrange(5, 60), self.process_notifications)
d.addErrback(self.trap_cancel)

def error_message_overload(self, fail):
"""errBack for handling excessive messages per UAID"""
fail.trap(MessageOverloadException)
self.force_retry(self.ap_settings.router.drop_user(self.ps.uaid))
self.sendClose()

def finish_notifications(self, notifs):
"""callback for processing notifications from storage"""
self.ps._notification_fetch = None
Expand Down Expand Up @@ -956,6 +970,7 @@ def finish_webpush_notifications(self, result):
# No more notifications, and we've scanned timestamped.
self.ps._more_notifications = False
self.ps.scan_timestamps = False
self.sent_notification_count = 0
if self.ps._check_notifications:
# Told to check again, start over
self.ps._check_notifications = False
Expand Down Expand Up @@ -986,6 +1001,9 @@ def finish_webpush_notifications(self, result):
self.ps.updates_sent[str(notif.channel_id)].append(notif)
msg = notif.websocket_format()
messages_sent = True
self.sent_notification_count += 1
if self.sent_notification_count > self.ps.msg_limit:
raise MessageOverloadException()
self.sendJSON(msg)

# Did we send any messages?
Expand Down

0 comments on commit 461a866

Please sign in to comment.