From 44d02853ca42d21f0c709951926353aa2a672b86 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 6 Mar 2017 15:18:35 -0800 Subject: [PATCH] feat: Add timeout for internally routed messages. closes #821 --- autopush/main.py | 4 ++++ autopush/protocol.py | 3 ++- autopush/router/simple.py | 6 ++++-- autopush/router/webpush.py | 6 +++--- autopush/settings.py | 4 +++- autopush/tests/test_main.py | 1 + 6 files changed, 17 insertions(+), 7 deletions(-) diff --git a/autopush/main.py b/autopush/main.py index b8abbc73..24b0f3fc 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -134,6 +134,9 @@ def add_shared_args(parser): parser.add_argument('--router_write_throughput', help="DynamoDB router write throughput", type=int, default=5, env_var="ROUTER_WRITE_THROUGHPUT") + parser.add_argument('--connection_timeout', + help="Seconds to wait for connection timeout", + type=int, default=1, env_var="CONNECTION_TIMEOUT") parser.add_argument('--max_data', help="Max data segment length in bytes", default=4096, env_var='MAX_DATA') parser.add_argument('--env', @@ -448,6 +451,7 @@ def make_settings(args, **kwargs): ami_id=ami_id, client_certs=client_certs, msg_limit=args.msg_limit, + connect_timeout=args.connection_timeout, **kwargs ) diff --git a/autopush/protocol.py b/autopush/protocol.py index 5f35d04e..0662b8b8 100644 --- a/autopush/protocol.py +++ b/autopush/protocol.py @@ -20,7 +20,8 @@ def __init__(self, response, deferred): def ignore(cls, response): """Class method helper for ignoring the response""" d = Deferred() - response.deliverBody(cls(response, d)) + if response is not None: + response.deliverBody(cls(response, d)) return d def dataReceived(self, data): diff --git a/autopush/router/simple.py b/autopush/router/simple.py index c505182e..4070a11b 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -18,6 +18,7 @@ from twisted.internet.defer import ( inlineCallbacks, returnValue, + CancelledError, ) from twisted.internet.error import ( ConnectError, @@ -81,13 +82,14 @@ def route_notification(self, notification, uaid_data): try: result = yield self._send_notification(uaid, node_id, notification) - except (ConnectError, ConnectionClosed, ResponseFailed) as exc: + except (ConnectError, ConnectionClosed, ResponseFailed, + CancelledError) as exc: self.metrics.increment("updates.client.host_gone") yield deferToThread(router.clear_node, uaid_data).addErrback(self._eat_db_err) if isinstance(exc, ConnectionRefusedError): # Occurs if an IP record is now used by some other node - # in AWS. + # in AWS or if the connection timesout. self.log.debug("Could not route message: {exc}", exc=exc) if result and result.code == 200: self.metrics.increment("router.broadcast.hit") diff --git a/autopush/router/webpush.py b/autopush/router/webpush.py index d50c84d3..74d4e126 100644 --- a/autopush/router/webpush.py +++ b/autopush/router/webpush.py @@ -51,13 +51,13 @@ def _send_notification(self, uaid, node_id, notification): payload = notification.serialize() payload["timestamp"] = int(time.time()) url = node_id + "/push/" + uaid - d = self.ap_settings.agent.request( + request = self.ap_settings.agent.request( "PUT", url.encode("utf8"), bodyProducer=FileBodyProducer(StringIO(json.dumps(payload))), ) - d.addCallback(IgnoreBody.ignore) - return d + request.addCallback(IgnoreBody.ignore) + return request def _save_notification(self, uaid_data, notification): """Saves a notification, returns a deferred. diff --git a/autopush/settings.py b/autopush/settings.py index 5abf133f..1c2c34d2 100644 --- a/autopush/settings.py +++ b/autopush/settings.py @@ -92,6 +92,7 @@ def __init__(self, client_certs=None, msg_limit=100, debug=False, + connect_timeout=0.5, ): """Initialize the Settings object @@ -105,7 +106,8 @@ def __init__(self, if not debug: pool._factory = QuietClientFactory - self.agent = Agent(reactor, connectTimeout=5, pool=pool) + self.agent = Agent(reactor, connectTimeout=connect_timeout, + pool=pool) if not crypto_key: crypto_key = [Fernet.generate_key()] diff --git a/autopush/tests/test_main.py b/autopush/tests/test_main.py index 8fc43eae..174c28c6 100644 --- a/autopush/tests/test_main.py +++ b/autopush/tests/test_main.py @@ -255,6 +255,7 @@ class TestArg: partner2=["2B:"*31 + "E8", "3C:"*31 + "D7"]) client_certs = json.dumps(_client_certs) + connection_timeout = 1 def setUp(self): patchers = [