Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: address HTTP2 errors in APNS
Browse files Browse the repository at this point in the history
Converts two errors into metrics and attempts retry on Connection or
HTTP2 errors.

Closes #1052
  • Loading branch information
jrconlin committed Oct 23, 2017
1 parent 898d702 commit a6ae7f1
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 55 deletions.
2 changes: 1 addition & 1 deletion autopush/noseplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Plugin(object): # type: ignore

try:
from pympler import asizeof
except:
except ImportError:
asizeof = None


Expand Down
69 changes: 40 additions & 29 deletions autopush/router/apns2.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def __init__(self, cert_file, key_file, topic,
alt=False, use_sandbox=False,
max_connections=APNS_MAX_CONNECTIONS,
logger=None, metrics=None,
load_connections=True):
load_connections=True,
max_retry=2):
"""Create the APNS client connector.
The cert_file and key_file can be derived from the exported `.p12`
Expand Down Expand Up @@ -76,6 +77,8 @@ def __init__(self, cert_file, key_file, topic,
:type metrics: autopush.metrics.IMetric
:param load_connections: used for testing
:type load_connections: bool
:param max_retry: Number of HTTP2 transmit attempts
:type max_retry: int
"""
self.server = SANDBOX if use_sandbox else SERVER
Expand All @@ -84,6 +87,7 @@ def __init__(self, cert_file, key_file, topic,
self.metrics = metrics
self.topic = topic
self._max_connections = max_connections
self._max_retry = max_retry
self.connections = deque(maxlen=max_connections)
if load_connections:
self.ssl_context = hyper.tls.init_context(cert=(cert_file,
Expand Down Expand Up @@ -128,34 +132,41 @@ def send(self, router_token, payload, apns_id,
if exp:
headers['apns-expiration'] = str(exp)
url = '/3/device/' + router_token
connection = self._get_connection()
try:
# request auto-opens closed connections, so if a connection
# has timed out or failed for other reasons, it's automatically
# re-established.
stream_id = connection.request(
'POST', url=url, body=body, headers=headers)
# get_response() may return an AttributeError. Not really sure
# how it happens, but the connected socket may get set to None.
# We'll treat that as a premature socket closure.
response = connection.get_response(stream_id)
if response.status != 200:
reason = json.loads(response.read().decode('utf-8'))['reason']
raise RouterException(
"APNS Transmit Error {}:{}".format(response.status,
reason),
status_code=502,
response_body="APNS could not process "
"your message {}".format(reason),
log_exception=False
)
except (HTTP20Error, AttributeError):
connection.close()
raise
finally:
# Returning a closed connection to the pool is ok.
# hyper will reconnect on .request()
self._return_connection(connection)
attempt = 0
while True:
try:
connection = self._get_connection()
# request auto-opens closed connections, so if a connection
# has timed out or failed for other reasons, it's automatically
# re-established.
stream_id = connection.request(
'POST', url=url, body=body, headers=headers)
# get_response() may return an AttributeError. Not really sure
# how it happens, but the connected socket may get set to None.
# We'll treat that as a premature socket closure.
response = connection.get_response(stream_id)
if response.status != 200:
reason = json.loads(
response.read().decode('utf-8'))['reason']
raise RouterException(
"APNS Transmit Error {}:{}".format(response.status,
reason),
status_code=502,
response_body="APNS could not process "
"your message {}".format(reason),
log_exception=False
)
break
except HTTP20Error:
connection.close()
attempt += 1
if attempt < self._max_retry:
continue
raise
finally:
# Returning a closed connection to the pool is ok.
# hyper will reconnect on .request()
self._return_connection(connection)

def _get_connection(self):
try:
Expand Down
33 changes: 17 additions & 16 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from hyper.http20.exceptions import ConnectionError, HTTP20Error
from twisted.internet.threads import deferToThread
from twisted.logger import Logger
from twisted.python.failure import Failure

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
Expand Down Expand Up @@ -46,7 +45,9 @@ def _connect(self, rel_channel, load_connections=True):
topic=cert_info.get("topic", default_topic),
logger=self.log,
metrics=self.metrics,
load_connections=load_connections)
load_connections=load_connections,
max_retry=cert_info.get('max_retry', 2)
)

def __init__(self, conf, router_conf, metrics, load_connections=True):
"""Create a new APNS router and connect to APNS
Expand Down Expand Up @@ -139,31 +140,31 @@ def _route(self, notification, router_data):
"alert": {"title": " ", "body": " "}
})
apns_id = str(uuid.uuid4()).lower()
# APNs may force close a connection on us without warning.
# if that happens, retry the message.
success = False
try:
apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
except (ConnectionError, AttributeError) as ex:
self.metrics.increment("notification.bridge.error",
success = True
except ConnectionError:
self.metrics.increment("notification.bridge.connection.error",
tags=make_tags(
self._base_tags,
application=rel_channel,
reason="connection_error"))
except HTTP20Error:
self.metrics.increment("notification.bridge.connection.error",
tags=make_tags(self._base_tags,
application=rel_channel,
reason="connection_error"))
self.log.error("Connection Error sending to APNS",
log_failure=Failure(ex))
reason="http2_error"))
if not success:
raise RouterException(
"Server error",
status_code=502,
response_body="APNS returned an error processing request",
log_exception=False,
)
except HTTP20Error as ex:
self.log.error("HTTP2 Error sending to APNS",
log_failure=Failure(ex))
raise RouterException(
"Server error",
status_code=502,
response_body="APNS returned an error processing request",
)

location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
self.metrics.increment("notification.bridge.sent",
tags=make_tags(self._base_tags,
Expand Down
2 changes: 1 addition & 1 deletion autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, conf, router_conf, metrics):
self.senderIDs[sid] = auth
try:
self.gcm[sid] = gcmclient.GCM(auth)
except:
except Exception:
raise IOError("GCM Bridge not initiated in main")
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")
Expand Down
2 changes: 1 addition & 1 deletion autopush/tests/certs/makecerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def make_cert(filename, cacert=None, cakey=None):
subject.ST = b"Çorum"
subject.L = b"Başmakçı"
subject.CN = b"localhost"
subject.O = b"Mozilla Test"
subject.O = b"Mozilla Test" # noqa: E741
subject.OU = b"Autopush Test %s" % filename
subject.emailAddress = b"[email protected]"
subjectAltName = X509Extension(b'subjectAltName', False, b'DNS:localhost')
Expand Down
2 changes: 1 addition & 1 deletion autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def get_notification(self, timeout=1):
d = self.ws.recv()
log.debug("Recv: %s", d)
return json.loads(d)
except:
except Exception:
return None
finally:
self.ws.settimeout(orig_timeout)
Expand Down
2 changes: 1 addition & 1 deletion autopush/tests/test_web_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TestX(Exception):

try:
raise TestX()
except:
except Exception:
exc_info = sys.exc_info()

self.base.write_error(999, exc_info=exc_info)
Expand Down
4 changes: 2 additions & 2 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def f():
try:
result = func(*args, **kwargs)
d.callback(result)
except:
except Exception:
d.errback(failure.Failure())
reactor.callLater(when, f)
return d
Expand Down Expand Up @@ -498,7 +498,7 @@ def onMessage(self, payload, isBinary):
data = None
try:
data = json.loads(payload.decode('utf8'))
except:
except (TypeError, ValueError):
pass

if not isinstance(data, dict):
Expand Down
9 changes: 6 additions & 3 deletions configs/autopush_shared.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ endpoint_port = 8082
; "key": "path_to_key_file",
; "topic": "com.mozilla.org.Firefox", // Bundle ID for associated app
; "max_connections": 100, // Max number of connection pool entries.
; "sandbox": False}, // Use the APNs sandbox feature
; ... }
; e.g {"firefox":{"cert":"certs/main.cert","key":"certs/main.key","topic":"com.mozilla.org.Firefox"},"beta":{"cert":"certs/beta.cert","key":"certs/beta.key","topic":"com.mozilla.org.FirefoxBeta"}}
; "sandbox": False, // Use the APNs sandbox feature
; "max_retry": 2, // Max number of retries in event of an HTTP2 error
; },
; ...
; }
; e.g {"firefox":{"cert":"certs/main.cert","key":"certs/main.key","topic":"com.mozilla.org.Firefox","max_retry":2},"beta":{"cert":"certs/beta.cert","key":"certs/beta.key","topic":"com.mozilla.org.FirefoxBeta"}}
#apns_creds =

0 comments on commit a6ae7f1

Please sign in to comment.