Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Use modern metrics
Browse files Browse the repository at this point in the history
Closes #943
  • Loading branch information
jrconlin committed Jul 7, 2017
1 parent 40f15e0 commit 3258b31
Show file tree
Hide file tree
Showing 20 changed files with 221 additions and 146 deletions.
16 changes: 2 additions & 14 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
attrib,
Factory
)
from boto.exception import JSONResponseError, BotoServerError
from boto.exception import JSONResponseError
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ItemNotFound,
Expand Down Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
19 changes: 16 additions & 3 deletions autopush/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,32 @@ def __init__(self, api_key, app_key, hostname, flush_interval=10,
def _prefix_name(self, name):
return "%s.%s" % (self._namespace, name)

def _normalize_tags(self, tags):
if isinstance(tags, list):
return tags
if isinstance(tags, dict):
return ["{}:{}".format(k, tags[k]) for k in tags]
raise TypeError("Could not normalize tags")

def start(self):
self._client.start(flush_interval=self._flush_interval,
roll_up_interval=self._flush_interval)

def increment(self, name, count=1, **kwargs):
def increment(self, name, count=1, tags=None, **kwargs):
if tags:
kwargs['tags'] = self._normalize_tags(tags)
self._client.increment(self._prefix_name(name), count, host=self._host,
**kwargs)

def gauge(self, name, count, **kwargs):
def gauge(self, name, count, tags=None, **kwargs):
if tags:
kwargs['tags'] = self._normalize_tags(tags)
self._client.gauge(self._prefix_name(name), count, host=self._host,
**kwargs)

def timing(self, name, duration, **kwargs):
def timing(self, name, duration, tags=None, **kwargs):
if tags:
kwargs['tags'] = self._normalize_tags(tags)
self._client.timing(self._prefix_name(name), value=duration,
host=self._host, **kwargs)

Expand Down
17 changes: 14 additions & 3 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, ap_settings, router_conf, metrics,
self.ap_settings = ap_settings
self._config = router_conf
self.metrics = metrics
self._base_tags = []
self._base_tags = ["platform:apns"]
self.apns = dict()
for rel_channel in self._config:
self.apns[rel_channel] = self._connect(rel_channel,
Expand Down Expand Up @@ -144,8 +144,11 @@ def _route(self, notification, router_data):
apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
except (ConnectionError, AttributeError) as ex:
self.metrics.increment("updates.client.bridge.apns.connection_err",
self._base_tags)
tags = ['application:{}'.format(rel_channel),
'reason:connection_error']
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
self.log.error("Connection Error sending to APNS",
log_failure=Failure(ex))
raise RouterException(
Expand All @@ -165,11 +168,19 @@ def _route(self, notification, router_data):

location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
tags = ['application:{}'.format(rel_channel)]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.sent",
tags=tags)

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
27 changes: 17 additions & 10 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.auth = router_conf.get("auth")
self._base_tags = []
self._base_tags = ["platform:fcm"]
try:
self.fcm = pyfcm.FCMNotification(api_key=self.auth)
except Exception as e:
Expand Down Expand Up @@ -199,16 +199,16 @@ def _route(self, notification, router_data):
self.log.error("Authentication Error: %s" % e)
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.metrics.increment("updates.client.bridge.fcm.connection_err",
self._base_tags)
tags = ["reason:connection_unavailable"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
self.log.warn("Could not connect to FCM server: %s" % e)
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled FCM Error: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.fcm.attempted",
self._base_tags)
return self._process_reply(result, notification, router_data,
ttl=router_ttl)

Expand All @@ -229,14 +229,18 @@ def _process_reply(self, reply, notification, router_data, ttl):
new_id = result.get('registration_id')
self.log.debug("FCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.fcm.failed.rereg",
self._base_tags)
tags = ["reason:reregister"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
if reply.get('failure'):
self.metrics.increment("updates.client.bridge.fcm.failed",
self._base_tags)
tags = ["reason:failure"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
reason = result.get('error', "Unreported")
err = self.reasonTable.get(reason)
if err.get("crit", False):
Expand All @@ -263,8 +267,11 @@ def _process_reply(self, reply, notification, router_data, ttl):
response_body=err['msg'],
router_data={},
)
self.metrics.increment("updates.client.bridge.fcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
39 changes: 25 additions & 14 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, ap_settings, router_conf, metrics):
self.gcm[sid] = gcmclient.GCM(auth)
except:
raise IOError("GCM Bridge not initiated in main")
self._base_tags = []
self._base_tags = ["platform:gcm"]
self.log.debug("Starting GCM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -122,15 +122,15 @@ def _route(self, notification, uaid_data):
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.log.warn("GCM Unavailable: %s" % e)
self.metrics.increment("updates.client.bridge.gcm.connection_err",
self._base_tags)
tags = ["reason:connection_unavailable"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
raise RouterException("Server error", status_code=502,
log_exception=False)
except Exception as e:
self.log.error("Unhandled exception in GCM Routing: %s" % e)
raise RouterException("Server error", status_code=500)
self.metrics.increment("updates.client.bridge.gcm.attempted",
self._base_tags)
return self._process_reply(result, uaid_data, ttl=router_ttl,
notification=notification)

Expand All @@ -148,16 +148,20 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
for old_id, new_id in reply.canonical.items():
self.log.debug("GCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("updates.client.bridge.gcm.failed.rereg",
self._base_tags)
tags = ["reason:reregister"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
# naks:
# uninstall:
for reg_id in reply.not_registered:
self.metrics.increment("updates.client.bridge.gcm.failed.unreg",
self._base_tags)
tags = ["reason:unregistered"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
self.log.debug("GCM no longer registered: %s" % reg_id)
return RouterResponse(
status_code=410,
Expand All @@ -167,8 +171,10 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# for reg_id, err_code in reply.failed.items():
if len(reply.failed.items()) > 0:
self.metrics.increment("updates.client.bridge.gcm.failed.failure",
self._base_tags)
tags = ["reason:failure"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
self.log.debug("GCM failures: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM unable to deliver", status_code=410,
Expand All @@ -178,17 +184,22 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

# retries:
if reply.needs_retry():
self.metrics.increment("updates.client.bridge.gcm.failed.retry",
self._base_tags)
tags = ["reason:retry"]
tags.extend(self._base_tags)
self.metrics.increment("notification.bridge.error",
tags=tags)
self.log.warn("GCM retry requested: {failed()}",
failed=lambda: repr(reply.failed.items()))
raise RouterException("GCM failure to deliver, retry",
status_code=503,
response_body="Please try request later.",
log_exception=False)

self.metrics.increment("updates.client.bridge.gcm.succeeded",
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
13 changes: 6 additions & 7 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ def amend_endpoint_response(self, response, router_data):
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Stored'})
return RouterResponse(202, "Notification Stored")

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
return RouterResponse(200, "Delivered")

@inlineCallbacks
Expand Down Expand Up @@ -98,7 +104,6 @@ def route_notification(self, notification, uaid_data):
# in AWS or if the connection timesout.
self.log.debug("Could not route message: {exc}", exc=exc)
if result and result.code == 200:
self.metrics.increment("router.broadcast.hit")
returnValue(self.delivered_response(notification))

# Save notification, node is not present or busy
Expand All @@ -108,7 +113,6 @@ def route_notification(self, notification, uaid_data):
try:
result = yield self._save_notification(uaid_data, notification)
if result is False:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
except JSONResponseError:
raise RouterException("Error saving to database",
Expand All @@ -128,7 +132,6 @@ def route_notification(self, notification, uaid_data):
try:
uaid_data = yield deferToThread(router.get_uaid, uaid)
except JSONResponseError:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
except ItemNotFound:
self.metrics.increment("updates.client.deleted")
Expand All @@ -141,7 +144,6 @@ def route_notification(self, notification, uaid_data):
# Verify there's a node_id in here, if not we're done
node_id = uaid_data.get("node_id")
if not node_id:
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))
try:
result = yield self._send_notification_check(uaid, node_id)
Expand All @@ -152,14 +154,11 @@ def route_notification(self, notification, uaid_data):
yield deferToThread(
router.clear_node,
uaid_data).addErrback(self._eat_db_err)
self.metrics.increment("router.broadcast.miss")
returnValue(self.stored_response(notification))

if result.code == 200:
self.metrics.increment("router.broadcast.save_hit")
returnValue(self.delivered_response(notification))
else:
self.metrics.increment("router.broadcast.miss")
ret_val = self.stored_response(notification)
if self.udp is not None and "server" in self.conf:
# Attempt to send off the UDP wake request.
Expand Down
6 changes: 6 additions & 0 deletions autopush/router/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class WebPushRouter(SimpleRouter):
"""SimpleRouter subclass to store individual messages appropriately"""

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Stored'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand All @@ -32,6 +35,9 @@ def delivered_response(self, notification):
logged_status=200)

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand Down
7 changes: 4 additions & 3 deletions autopush/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ def parse_endpoint(self, metrics, token, version="v1", ckey_header=None,
vapid_auth = parse_auth_header(auth_header)
if not vapid_auth:
raise VapidAuthException("Invalid Auth token")
metrics.increment("updates.notification.auth.{}".format(
vapid_auth['scheme']
))
metrics.increment("notification.auth",
tags="vapid:{version},scheme:{scheme}".format(
**vapid_auth
).split(","))
# pull the public key from the VAPID auth header if needed
try:
if vapid_auth['version'] != 1:
Expand Down
Loading

0 comments on commit 3258b31

Please sign in to comment.