Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1296 from mozilla-services/feat/1291a
Browse files Browse the repository at this point in the history
feat: make gcm calls use async callbacks
  • Loading branch information
bbangert authored Oct 8, 2018
2 parents bf51b37 + d67dcb4 commit fbc4ab8
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 164 deletions.
55 changes: 31 additions & 24 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""GCM Router"""
from typing import Any # noqa

from requests.exceptions import ConnectionError, Timeout
from twisted.internet.threads import deferToThread
from twisted.logger import Logger
from twisted.internet.error import ConnectError, TimeoutError

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
Expand All @@ -28,15 +27,16 @@ def __init__(self, conf, router_conf, metrics):
self.dryRun = router_conf.get("dryrun", False)
self.collapseKey = router_conf.get("collapseKey")
timeout = router_conf.get("timeout", 10)
self.gcm = {}
self.gcmclients = {}
self.senderIDs = {}
# Flatten the SenderID list from human readable and init gcmclient
if not router_conf.get("senderIDs"):
raise IOError("SenderIDs not configured.")
for sid in router_conf.get("senderIDs"):
auth = router_conf.get("senderIDs").get(sid).get("auth")
self.senderIDs[sid] = auth
self.gcm[sid] = gcmclient.GCM(auth, timeout=timeout)
self.gcmclients[sid] = gcmclient.GCM(auth, timeout=timeout,
logger=self.log)
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

Expand Down Expand Up @@ -69,7 +69,7 @@ def register(self, uaid, router_data, app_id, *args, **kwargs):
def route_notification(self, notification, uaid_data):
"""Start the GCM notification routing, returns a deferred"""
# Kick the entire notification routing off to a thread
return deferToThread(self._route, notification, uaid_data)
return self._route(notification, uaid_data)

def _route(self, notification, uaid_data):
"""Blocking GCM call to route the notification"""
Expand Down Expand Up @@ -111,41 +111,48 @@ def _route(self, notification, uaid_data):
data=data,
)
try:
gcm = self.gcm[router_data['creds']['senderID']]
result = gcm.send(payload)
except RouterException:
raise # pragma nocover
client = self.gcmclients[router_data['creds']['senderID']]
d = client.send(payload)
d.addCallback(
self._process_reply,
uaid_data,
router_ttl,
notification)

d.addErrback(
self._process_error
)
return d
except KeyError:
self.log.critical("Missing GCM bridge credentials")
raise RouterException("Server error", status_code=500,
errno=900)
except gcmclient.GCMAuthenticationError as e:
self.log.error("GCM Authentication Error: %s" % e)

def _process_error(self, failure):
err = failure.value
if isinstance(err, gcmclient.GCMAuthenticationError):
self.log.error("GCM Authentication Error: %s" % err)
raise RouterException("Server error", status_code=500,
errno=901)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
if isinstance(err, TimeoutError):
self.log.warn("GCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
reason="timeout"))
raise RouterException("Server error", status_code=502,
errno=902,
errno=903,
log_exception=False)
except Timeout as e:
self.log.warn("GCM Timeout: %s" % e)
if isinstance(err, ConnectError):
self.log.warn("GCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="timeout"))
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=903,
errno=902,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)
return failure

def _error(self, err, status, **kwargs):
"""Error handler that raises the RouterException"""
Expand Down
73 changes: 48 additions & 25 deletions autopush/router/gcmclient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json

import requests
import treq
from twisted.web.http_headers import Headers
from twisted.logger import Logger
from twisted.internet.error import ConnectError

from autopush.exceptions import RouterException

Expand All @@ -12,7 +15,7 @@ class GCMAuthenticationError(Exception):
class Result(object):
"""Abstraction object for GCM response"""

def __init__(self, message, response):
def __init__(self, response, message):
"""Process GCM message and response into abstracted object
:param message: Message payload
Expand All @@ -30,14 +33,13 @@ def __init__(self, message, response):
self.message = message
self.retry_message = None

self.retry_after = response.headers.get('Retry-After', None)
self.retry_after = (
response.headers.getRawHeaders('Retry-After') or [None])[0]

if response.status_code != 200:
self.retry_message = message
else:
self._parse_response(message, response.content)

def _parse_response(self, message, content):
def parse_response(self, content, code, message):
# 401 handled in GCM.process()
if code in (400, 404):
raise RouterException(content)
data = json.loads(content)
if not data.get('results'):
raise RouterException("Recv'd invalid response from GCM")
Expand All @@ -54,6 +56,7 @@ def _parse_response(self, message, content):
self.not_registered.append(reg_id)
else:
self.failed[reg_id] = res['error']
return self


class JSONMessage(object):
Expand Down Expand Up @@ -124,9 +127,31 @@ def __init__(self,
self._endpoint = "https://{}".format(endpoint)
self._api_key = api_key
self.metrics = metrics
self.log = logger
self.log = logger or Logger()
self._options = options
self._sender = requests.post
self._sender = treq.post

def process(self, response, payload):
if response.code == 401:
raise GCMAuthenticationError("Authentication Error")

result = Result(response, payload)

if 500 <= response.code <= 599:
result.retry_message = payload
return result

# Fetch the content body
d = response.text()
d.addCallback(result.parse_response, response.code, payload)
return d

def error(self, failure):
if isinstance(failure.value, GCMAuthenticationError) or \
isinstance(failure.value, ConnectError):
raise failure.value
self.log.error("GCMClient failure: {}".format(failure.value))
raise RouterException("Server error: {}".format(failure.value))

def send(self, payload):
"""Send a payload to GCM
Expand All @@ -136,23 +161,21 @@ def send(self, payload):
:return: Result
"""
headers = {
'Content-Type': 'application/json',
'Authorization': 'key={}'.format(self._api_key),
}
headers = Headers({
'Content-Type': ['application/json'],
'Authorization': ['key={}'.format(self._api_key)],
})

if 'timeout' not in self._options:
self._options['timeout'] = 3

response = self._sender(
d = self._sender(
url=self._endpoint,
headers=headers,
data=json.dumps(payload.payload),
**self._options
)

if response.status_code in (400, 404):
raise RouterException(response.content)

if response.status_code == 401:
raise GCMAuthenticationError("Authentication Error")

if response.status_code == 200 or (500 <= response.status_code <= 599):
return Result(payload, response)
# handle the immediate response (which contains no body)
d.addCallback(self.process, payload)
d.addErrback(self.error)
return d
Loading

0 comments on commit fbc4ab8

Please sign in to comment.