Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: make simplepush protocol optional
Browse files Browse the repository at this point in the history
Preliminary work for removing simplepush protocol by making it optional
(default on)

BREAKING: Simplepush may now be disabled on the server at any time in
the future, and may occur without warning.

Issue #799
  • Loading branch information
jrconlin committed Jul 25, 2017
1 parent cd99420 commit 876c959
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 170 deletions.
8 changes: 6 additions & 2 deletions autopush/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ def _for_handler(cls, ap_settings, **kwargs):
class EndpointHTTPFactory(BaseHTTPFactory):

ap_handlers = (
(r"/spush/(?:(?P<api_ver>v\d+)\/)?(?P<token>[^\/]+)",
SimplePushHandler),
(r"/wpush/(?:(?P<api_ver>v\d+)\/)?(?P<token>[^\/]+)",
WebPushHandler),
(r"/m/(?P<message_id>[^\/]+)", MessageHandler),
Expand All @@ -160,6 +158,12 @@ def __init__(self,
routers, # type: Dict[str, IRouter]
**kwargs):
# type: (...) -> None
if ap_settings.enable_simplepush:
self.ap_handlers += (
(r"/spush/(?:(?P<api_ver>v\d+)\/)?(?P<token>[^\/]+)",
SimplePushHandler),
)
self.ap_handlers = tuple(self.ap_handlers)
BaseHTTPFactory.__init__(self, ap_settings, db=db, **kwargs)
self.routers = routers

Expand Down
2 changes: 0 additions & 2 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def from_argparse(cls, ns):
cors=not ns.no_cors,
bear_hash_key=ns.auth_key,
proxy_protocol_port=ns.proxy_protocol_port,
use_cryptography=ns.use_cryptography,
)


Expand Down Expand Up @@ -282,5 +281,4 @@ def from_argparse(cls, ns):
auto_ping_timeout=ns.auto_ping_timeout,
max_connections=ns.max_connections,
close_handshake_timeout=ns.close_handshake_timeout,
use_cryptography=ns.use_cryptography,
)
4 changes: 4 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def add_shared_args(parser):
help="Use the cryptography library vs. JOSE",
action="store_true",
default=False, env_var="USE_CRYPTOGRAPHY")
parser.add_argument('--enable_simplepush',
help="Enable the deprecated Simplepush protocol",
action="store_true", default=True,
env_var="ENABLE_SIMPLEPUSH")
# No ENV because this is for humans
_add_external_router_args(parser)
_obsolete_args(parser)
Expand Down
9 changes: 5 additions & 4 deletions autopush/router/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
from autopush.router.fcm import FCMRouter
from autopush.settings import AutopushSettings # noqa

__all__ = ["APNSRouter", "FCMRouter", "GCMRouter", "SimpleRouter",
"WebPushRouter"]
__all__ = ["APNSRouter", "FCMRouter", "GCMRouter", "WebPushRouter",
"SimpleRouter"]


def routers_from_settings(settings, db, agent):
# type: (AutopushSettings, DatabaseManager, Agent) -> Dict[str, IRouter]
"""Create a dict of IRouters for the given settings"""
router_conf = settings.router_conf
routers = dict(
simplepush=SimpleRouter(
settings, router_conf.get("simplepush"), db, agent),
webpush=WebPushRouter(settings, None, db, agent)
)
if settings.enable_simplepush:
routers['simplepush'] = SimpleRouter(
settings, router_conf.get("simplepush"), db, agent)
if 'apns' in router_conf:
routers["apns"] = APNSRouter(settings, router_conf["apns"], db.metrics)
if 'gcm' in router_conf:
Expand Down
159 changes: 2 additions & 157 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,23 @@
"""
import json
from urllib import urlencode
from StringIO import StringIO
from typing import Any # noqa

import requests
from boto.dynamodb2.exceptions import ItemNotFound
from boto.exception import JSONResponseError
from twisted.internet.threads import deferToThread
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
CancelledError,
)
from twisted.internet.error import (
ConnectError,
ConnectionClosed,
ConnectionRefusedError,
)
from twisted.logger import Logger
from twisted.web._newclient import ResponseFailed
from twisted.web.client import FileBodyProducer

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.protocol import IgnoreBody
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa
from autopush.router.webpush import WebPushRouter


class SimpleRouter(object):
class SimpleRouter(WebPushRouter):
"""Implements :class:`autopush.router.interface.IRouter` for internal
routing to an Autopush node
"""
log = Logger()

def __init__(self, ap_settings, router_conf, db, agent):
"""Create a new SimpleRouter"""
self.ap_settings = ap_settings
self.conf = router_conf
self.db = db
self.agent = agent
self.waker = None

@property
def metrics(self):
return self.db.metrics

def register(self, uaid, router_data, app_id, *args, **kwargs):
# type: (str, JSONDict, str, *Any, **Any) -> None
"""No additional routing data"""

def amend_endpoint_response(self, response, router_data):
# type: (JSONDict, JSONDict) -> None
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.increment("notification.message_data",
notification.data_length,
Expand All @@ -73,108 +35,6 @@ def delivered_response(self, notification):
tags=make_tags(destination='Direct'))
return RouterResponse(200, "Delivered")

@inlineCallbacks
def route_notification(self, notification, uaid_data):
"""Route a notification to an internal node, and store it if the node
can't deliver immediately or is no longer a valid node
"""
# Determine if they're connected at the moment
node_id = uaid_data.get("node_id")
uaid = uaid_data["uaid"]
self.udp = uaid_data.get("udp")
router = self.db.router

# Node_id is present, attempt delivery.
# - Send Notification to node
# - Success: Done, return 200
# - Error (Node busy): Jump to Save notification below
# - Error (Client gone, node gone/dead): Clear node entry for user
# - Both: Done, return 503
if node_id:
result = None
try:
result = yield self._send_notification(uaid, node_id,
notification)
except (ConnectError, ConnectionClosed, ResponseFailed,
CancelledError) as exc:
self.metrics.increment("updates.client.host_gone")
yield deferToThread(router.clear_node,
uaid_data).addErrback(self._eat_db_err)
if isinstance(exc, ConnectionRefusedError):
# Occurs if an IP record is now used by some other node
# in AWS or if the connection timesout.
self.log.debug("Could not route message: {exc}", exc=exc)
if result and result.code == 200:
returnValue(self.delivered_response(notification))

# Save notification, node is not present or busy
# - Save notification
# - Success (older version): Done, return 202
# - Error (db error): Done, return 503
try:
result = yield self._save_notification(uaid_data, notification)
if result is False:
returnValue(self.stored_response(notification))
except JSONResponseError:
raise RouterException("Error saving to database",
status_code=503,
response_body="Retry Request",
errno=201)

# - Lookup client
# - Success (node found): Notify node of new notification
# - Success: Done, return 200
# - Error (no client): Done, return 202
# - Error (no node): Clear node entry
# - Both: Done, return 202
# - Success (no node): Done, return 202
# - Error (db error): Done, return 202
# - Error (no client) : Done, return 404
try:
uaid_data = yield deferToThread(router.get_uaid, uaid)
except JSONResponseError:
returnValue(self.stored_response(notification))
except ItemNotFound:
self.metrics.increment("updates.client.deleted")
raise RouterException("User was deleted",
status_code=410,
response_body="Invalid UAID",
log_exception=False,
errno=105)

# Verify there's a node_id in here, if not we're done
node_id = uaid_data.get("node_id")
if not node_id:
returnValue(self.stored_response(notification))
try:
result = yield self._send_notification_check(uaid, node_id)
except (ConnectError, ConnectionClosed, ResponseFailed) as exc:
self.metrics.increment("updates.client.host_gone")
if isinstance(exc, ConnectionRefusedError):
self.log.debug("Could not route message: {exc}", exc=exc)
yield deferToThread(
router.clear_node,
uaid_data).addErrback(self._eat_db_err)
returnValue(self.stored_response(notification))

if result.code == 200:
returnValue(self.delivered_response(notification))
else:
ret_val = self.stored_response(notification)
if self.udp is not None and "server" in self.conf:
# Attempt to send off the UDP wake request.
try:
yield deferToThread(
requests.post(
self.conf["server"],
data=urlencode(self.udp["data"]),
cert=self.conf.get("cert"),
timeout=self.conf.get("server_timeout", 3)))
except Exception as exc:
self.log.debug("Could not send UDP wake request: {exc}",
exc=exc)
returnValue(ret_val)

#############################################################
# Blocking Helper Functions
#############################################################
Expand Down Expand Up @@ -203,18 +63,3 @@ def _send_notification(self, uaid, node_id, notification):
)
d.addCallback(IgnoreBody.ignore)
return d

def _send_notification_check(self, uaid, node_id):
"""Send a command to the node to check for notifications"""
url = node_id + "/notif/" + uaid
return self.agent.request(
"PUT",
url.encode("utf8"),
).addCallback(IgnoreBody.ignore)

#############################################################
# Error Callbacks
#############################################################
def _eat_db_err(self, fail):
"""errBack for ignoring provisioned throughput errors"""
fail.trap(JSONResponseError)
Loading

0 comments on commit 876c959

Please sign in to comment.