From 39db9a7d821c4dc531fecbcf71ce4a3fd59a6d96 Mon Sep 17 00:00:00 2001 From: jr conlin Date: Wed, 5 Jul 2017 14:55:58 -0700 Subject: [PATCH] feat: Use modern metrics Convert all tags to lists via make_tags Closes #943, #950 --- autopush/db.py | 16 +------ autopush/main.py | 1 - autopush/metrics.py | 10 ++++- autopush/router/apnsrouter.py | 17 ++++++-- autopush/router/fcm.py | 33 +++++++++------ autopush/router/gcm.py | 41 +++++++++++------- autopush/router/simple.py | 14 +++---- autopush/router/webpush.py | 7 ++++ autopush/settings.py | 7 ++-- autopush/tests/test_integration.py | 14 +++++-- autopush/tests/test_router.py | 28 +++++++++---- autopush/tests/test_web_base.py | 5 ++- autopush/tests/test_websocket.py | 30 +++++-------- autopush/utils.py | 9 +++- autopush/web/base.py | 21 +++++++++- autopush/web/registration.py | 5 +-- autopush/web/simplepush.py | 10 ++++- autopush/web/webpush.py | 40 +++++++++++++----- autopush/websocket.py | 67 +++++++++--------------------- 19 files changed, 221 insertions(+), 154 deletions(-) diff --git a/autopush/db.py b/autopush/db.py index 7060598e..8f330427 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -42,7 +42,7 @@ attrib, Factory ) -from boto.exception import JSONResponseError, BotoServerError +from boto.exception import JSONResponseError from boto.dynamodb2.exceptions import ( ConditionalCheckFailedException, ItemNotFound, @@ -298,17 +298,7 @@ def track_provisioned(func): def wrapper(self, *args, **kwargs): if TRACK_DB_CALLS: DB_CALLS.append(func.__name__) - try: - return func(self, *args, **kwargs) - except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.%s" % func.__name__) - raise - except JSONResponseError: - self.metrics.increment("error.jsonresponse.%s" % func.__name__) - raise - except BotoServerError: - self.metrics.increment("error.botoserver.%s" % func.__name__) - raise + return func(self, *args, **kwargs) return wrapper @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None): chid=normalize_id(chid)) return True except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.delete_notification") return False @@ -678,7 +667,6 @@ def get_uaid(self, uaid): # We unfortunately have to catch this here, as track_provisioned # will not see this, since JSONResponseError is a subclass and # will capture it - self.metrics.increment("error.provisioned.get_uaid") raise except JSONResponseError: # pragma: nocover # We trap JSONResponseError because Moto returns text instead of diff --git a/autopush/main.py b/autopush/main.py index fd2df944..e3bb82a3 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -260,7 +260,6 @@ def add_websocket(self): self.clients) site_factory = self.websocket_site_factory(settings, ws_factory) self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf()) - self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics) @classmethod def from_argparse(cls, ns): diff --git a/autopush/metrics.py b/autopush/metrics.py index 8ff0ccbb..c41f004d 100644 --- a/autopush/metrics.py +++ b/autopush/metrics.py @@ -1,5 +1,5 @@ """Metrics interface and implementations""" -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence, Any # noqa from twisted.internet import reactor from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient @@ -74,6 +74,14 @@ def timing(self, name, duration, **kwargs): self._metric.timing(name, duration) +def make_tags(base=None, **kwargs): + # type: (Sequence[str], **Any) -> Sequence[str] + """Generate a list of tag values""" + tags = list(base or []) + tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems()) + return tags + + class DatadogMetrics(object): """DataDog Metric backend""" def __init__(self, api_key, app_key, hostname, flush_interval=10, diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index ee188cd1..6e1f32c9 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -8,6 +8,7 @@ from twisted.python.failure import Failure from autopush.exceptions import RouterException +from autopush.metrics import make_tags from autopush.router.apns2 import ( APNSClient, APNS_MAX_CONNECTIONS, @@ -62,7 +63,7 @@ def __init__(self, ap_settings, router_conf, metrics, self.ap_settings = ap_settings self._config = router_conf self.metrics = metrics - self._base_tags = [] + self._base_tags = ["platform:apns"] self.apns = dict() for rel_channel in self._config: self.apns[rel_channel] = self._connect(rel_channel, @@ -144,8 +145,10 @@ def _route(self, notification, router_data): apns_client.send(router_token=router_token, payload=payload, apns_id=apns_id) except (ConnectionError, AttributeError) as ex: - self.metrics.increment("updates.client.bridge.apns.connection_err", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + application=rel_channel, + reason="connection_error")) self.log.error("Connection Error sending to APNS", log_failure=Failure(ex)) raise RouterException( @@ -165,11 +168,19 @@ def _route(self, notification, router_data): location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) + self.metrics.increment("notification.bridge.sent", + tags=make_tags(self._base_tags, + application=rel_channel)) + self.metrics.increment( "updates.client.bridge.apns.%s.sent" % router_data["rel_channel"], self._base_tags ) + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) return RouterResponse(status_code=201, response_body="", headers={"TTL": notification.ttl, "Location": location}, diff --git a/autopush/router/fcm.py b/autopush/router/fcm.py index 12bb02a8..fe203ee5 100644 --- a/autopush/router/fcm.py +++ b/autopush/router/fcm.py @@ -7,6 +7,7 @@ from twisted.logger import Logger from autopush.exceptions import RouterException +from autopush.metrics import make_tags from autopush.router.interface import RouterResponse from autopush.types import JSONDict # noqa @@ -110,7 +111,7 @@ def __init__(self, ap_settings, router_conf, metrics): self.collapseKey = router_conf.get("collapseKey", "webpush") self.senderID = router_conf.get("senderID") self.auth = router_conf.get("auth") - self._base_tags = [] + self._base_tags = ["platform:fcm"] try: self.fcm = pyfcm.FCMNotification(api_key=self.auth) except Exception as e: @@ -165,12 +166,12 @@ def _route(self, notification, router_data): # correct encryption headers are included with the data. if notification.data: mdata = self.config.get('max_data', 4096) - if len(notification.data) > mdata: + if notification.data_length > mdata: raise self._error("This message is intended for a " + "constrained device and is limited " + "to 3070 bytes. Converted buffer too " + "long by %d bytes" % - (len(notification.data) - mdata), + (notification.data_length - mdata), 413, errno=104, log_exception=False) data['body'] = notification.data @@ -199,16 +200,16 @@ def _route(self, notification, router_data): self.log.error("Authentication Error: %s" % e) raise RouterException("Server error", status_code=500) except ConnectionError as e: - self.metrics.increment("updates.client.bridge.fcm.connection_err", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags( + self._base_tags, + reason="connection_unavailable")) self.log.warn("Could not connect to FCM server: %s" % e) raise RouterException("Server error", status_code=502, log_exception=False) except Exception as e: self.log.error("Unhandled FCM Error: %s" % e) raise RouterException("Server error", status_code=500) - self.metrics.increment("updates.client.bridge.fcm.attempted", - self._base_tags) return self._process_reply(result, notification, router_data, ttl=router_ttl) @@ -229,20 +230,22 @@ def _process_reply(self, reply, notification, router_data, ttl): new_id = result.get('registration_id') self.log.debug("FCM id changed : {old} => {new}", old=old_id, new=new_id) - self.metrics.increment("updates.client.bridge.fcm.failed.rereg", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="reregister")) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) if reply.get('failure'): - self.metrics.increment("updates.client.bridge.fcm.failed", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="failure")) reason = result.get('error', "Unreported") err = self.reasonTable.get(reason) if err.get("crit", False): self.log.critical( err['msg'], - nlen=len(notification.data), + nlen=notification.data_length, regid=router_data["token"], senderid=self.senderID, ttl=notification.ttl, @@ -263,8 +266,12 @@ def _process_reply(self, reply, notification, router_data, ttl): response_body=err['msg'], router_data={}, ) - self.metrics.increment("updates.client.bridge.fcm.succeeded", + self.metrics.increment("notification.bridge.sent", self._base_tags) + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination="Direct")) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 02593989..0e6071f9 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -7,6 +7,7 @@ from twisted.logger import Logger from autopush.exceptions import RouterException +from autopush.metrics import make_tags from autopush.router.interface import RouterResponse from autopush.types import JSONDict # noqa @@ -38,7 +39,7 @@ def __init__(self, ap_settings, router_conf, metrics): self.gcm[sid] = gcmclient.GCM(auth) except: raise IOError("GCM Bridge not initiated in main") - self._base_tags = [] + self._base_tags = ["platform:gcm"] self.log.debug("Starting GCM router...") def amend_endpoint_response(self, response, router_data): @@ -82,12 +83,12 @@ def _route(self, notification, uaid_data): # correct encryption headers are included with the data. if notification.data: mdata = self.config.get('max_data', 4096) - if len(notification.data) > mdata: + if notification.data_length > mdata: raise self._error("This message is intended for a " + "constrained device and is limited " + "to 3070 bytes. Converted buffer too " + "long by %d bytes" % - (len(notification.data) - mdata), + (notification.data_length - mdata), 413, errno=104, log_exception=False) data['body'] = notification.data @@ -122,15 +123,15 @@ def _route(self, notification, uaid_data): raise RouterException("Server error", status_code=500) except ConnectionError as e: self.log.warn("GCM Unavailable: %s" % e) - self.metrics.increment("updates.client.bridge.gcm.connection_err", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags( + self._base_tags, + reason="connection_unavailable")) raise RouterException("Server error", status_code=502, log_exception=False) except Exception as e: self.log.error("Unhandled exception in GCM Routing: %s" % e) raise RouterException("Server error", status_code=500) - self.metrics.increment("updates.client.bridge.gcm.attempted", - self._base_tags) return self._process_reply(result, uaid_data, ttl=router_ttl, notification=notification) @@ -148,16 +149,18 @@ def _process_reply(self, reply, uaid_data, ttl, notification): for old_id, new_id in reply.canonical.items(): self.log.debug("GCM id changed : {old} => {new}", old=old_id, new=new_id) - self.metrics.increment("updates.client.bridge.gcm.failed.rereg", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="reregister")) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) # naks: # uninstall: for reg_id in reply.not_registered: - self.metrics.increment("updates.client.bridge.gcm.failed.unreg", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="unregistered")) self.log.debug("GCM no longer registered: %s" % reg_id) return RouterResponse( status_code=410, @@ -167,8 +170,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification): # for reg_id, err_code in reply.failed.items(): if len(reply.failed.items()) > 0: - self.metrics.increment("updates.client.bridge.gcm.failed.failure", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="failure")) self.log.debug("GCM failures: {failed()}", failed=lambda: repr(reply.failed.items())) raise RouterException("GCM unable to deliver", status_code=410, @@ -178,8 +182,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification): # retries: if reply.needs_retry(): - self.metrics.increment("updates.client.bridge.gcm.failed.retry", - self._base_tags) + self.metrics.increment("notification.bridge.error", + tags=make_tags(self._base_tags, + reason="retry")) self.log.warn("GCM retry requested: {failed()}", failed=lambda: repr(reply.failed.items())) raise RouterException("GCM failure to deliver, retry", @@ -187,8 +192,12 @@ def _process_reply(self, reply, uaid_data, ttl, notification): response_body="Please try request later.", log_exception=False) - self.metrics.increment("updates.client.bridge.gcm.succeeded", + self.metrics.increment("notification.bridge.sent", self._base_tags) + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/simple.py b/autopush/router/simple.py index 34314d00..97b18177 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -29,6 +29,7 @@ from twisted.web.client import FileBodyProducer from autopush.exceptions import RouterException +from autopush.metrics import make_tags from autopush.protocol import IgnoreBody from autopush.router.interface import RouterResponse from autopush.types import JSONDict # noqa @@ -61,9 +62,15 @@ def amend_endpoint_response(self, response, router_data): """Stubbed out for this router""" def stored_response(self, notification): + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) return RouterResponse(202, "Notification Stored") def delivered_response(self, notification): + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) return RouterResponse(200, "Delivered") @inlineCallbacks @@ -98,7 +105,6 @@ def route_notification(self, notification, uaid_data): # in AWS or if the connection timesout. self.log.debug("Could not route message: {exc}", exc=exc) if result and result.code == 200: - self.metrics.increment("router.broadcast.hit") returnValue(self.delivered_response(notification)) # Save notification, node is not present or busy @@ -108,7 +114,6 @@ def route_notification(self, notification, uaid_data): try: result = yield self._save_notification(uaid_data, notification) if result is False: - self.metrics.increment("router.broadcast.miss") returnValue(self.stored_response(notification)) except JSONResponseError: raise RouterException("Error saving to database", @@ -128,7 +133,6 @@ def route_notification(self, notification, uaid_data): try: uaid_data = yield deferToThread(router.get_uaid, uaid) except JSONResponseError: - self.metrics.increment("router.broadcast.miss") returnValue(self.stored_response(notification)) except ItemNotFound: self.metrics.increment("updates.client.deleted") @@ -141,7 +145,6 @@ def route_notification(self, notification, uaid_data): # Verify there's a node_id in here, if not we're done node_id = uaid_data.get("node_id") if not node_id: - self.metrics.increment("router.broadcast.miss") returnValue(self.stored_response(notification)) try: result = yield self._send_notification_check(uaid, node_id) @@ -152,14 +155,11 @@ def route_notification(self, notification, uaid_data): yield deferToThread( router.clear_node, uaid_data).addErrback(self._eat_db_err) - self.metrics.increment("router.broadcast.miss") returnValue(self.stored_response(notification)) if result.code == 200: - self.metrics.increment("router.broadcast.save_hit") returnValue(self.delivered_response(notification)) else: - self.metrics.increment("router.broadcast.miss") ret_val = self.stored_response(notification) if self.udp is not None and "server" in self.conf: # Attempt to send off the UDP wake request. diff --git a/autopush/router/webpush.py b/autopush/router/webpush.py index c4c26bc7..4591fcc1 100644 --- a/autopush/router/webpush.py +++ b/autopush/router/webpush.py @@ -13,6 +13,7 @@ from twisted.web.client import FileBodyProducer from autopush.exceptions import RouterException +from autopush.metrics import make_tags from autopush.protocol import IgnoreBody from autopush.router.interface import RouterResponse from autopush.router.simple import SimpleRouter @@ -24,6 +25,9 @@ class WebPushRouter(SimpleRouter): """SimpleRouter subclass to store individual messages appropriately""" def delivered_response(self, notification): + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", @@ -32,6 +36,9 @@ def delivered_response(self, notification): logged_status=200) def stored_response(self, notification): + self.metrics.gauge("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/settings.py b/autopush/settings.py index ac68e082..d073d2cb 100644 --- a/autopush/settings.py +++ b/autopush/settings.py @@ -339,9 +339,10 @@ def parse_endpoint(self, metrics, token, version="v1", ckey_header=None, vapid_auth = parse_auth_header(auth_header) if not vapid_auth: raise VapidAuthException("Invalid Auth token") - metrics.increment("updates.notification.auth.{}".format( - vapid_auth['scheme'] - )) + metrics.increment("notification.auth", + tags="vapid:{version},scheme:{scheme}".format( + **vapid_auth + ).split(",")) # pull the public key from the VAPID auth header if needed try: if vapid_auth['version'] != 1: diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index fd0fe897..c2aa9004 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -13,7 +13,7 @@ from distutils.spawn import find_executable from StringIO import StringIO from httplib import HTTPResponse # noqa -from mock import Mock, call +from mock import Mock, call, patch from unittest.case import SkipTest from zope.interface import implementer @@ -47,7 +47,7 @@ from autopush.main import ConnectionApplication, EndpointApplication from autopush.settings import AutopushSettings from autopush.utils import base64url_encode -from autopush.metrics import SinkMetrics +from autopush.metrics import SinkMetrics, DatadogMetrics from autopush.tests.support import TestingLogObserver from autopush.websocket import PushServerFactory @@ -675,8 +675,9 @@ def test_webpush_data_delivery_to_connected_client(self): )) yield self.shut_down(client) + @patch("autopush.metrics.datadog") @inlineCallbacks - def test_webpush_data_delivery_to_disconnected_client(self): + def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): tests = { "d248d4e0-0ef4-41d9-8db5-2533ad8e4041": dict( data=b"\xe2\x82\x28\xf0\x28\x8c\xbc", result="4oIo8CiMvA"), @@ -688,6 +689,11 @@ def test_webpush_data_delivery_to_disconnected_client(self): "6c33e055-5762-47e5-b90c-90ad9bfe3f53": dict( data=b"\xc3\x28\xa0\xa1\xe2\x28\xa1", result="wyigoeIooQ"), } + # Piggy back a check for stored source metrics + self.conn.db.metrics = DatadogMetrics( + "someapikey", "someappkey", namespace="testpush", + hostname="localhost") + self.conn.db.metrics._client = Mock() client = Client("ws://localhost:9010/", use_webpush=True) yield client.connect() @@ -716,6 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self): ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci), "message_size not logged") + eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'], + ['source:Stored']) yield self.shut_down(client) @inlineCallbacks diff --git a/autopush/tests/test_router.py b/autopush/tests/test_router.py index 40a1998d..17018257 100644 --- a/autopush/tests/test_router.py +++ b/autopush/tests/test_router.py @@ -543,11 +543,17 @@ def check_results(fail): def test_router_notification_gcm_id_change(self): self.mock_result.canonical["old"] = "new" self.router.gcm['test123'] = self.gcm + self.router.metrics = Mock() d = self.router.route_notification(self.notif, self.router_data) def check_results(result): ok_(isinstance(result, RouterResponse)) eq_(result.router_data, dict(token="new")) + eq_(self.router.metrics.increment.call_args[0][0], + 'notification.bridge.error') + self.router.metrics.increment.call_args[1]['tags'].sort() + eq_(self.router.metrics.increment.call_args[1]['tags'], + ['platform:gcm', 'reason:reregister']) ok_(self.router.gcm['test123'].send.called) d.addCallback(check_results) return d @@ -555,11 +561,17 @@ def check_results(result): def test_router_notification_gcm_not_regged(self): self.mock_result.not_registered = {"connect_data": True} self.router.gcm['test123'] = self.gcm + self.router.metrics = Mock() d = self.router.route_notification(self.notif, self.router_data) def check_results(result): ok_(isinstance(result, RouterResponse)) eq_(result.router_data, dict()) + eq_(self.router.metrics.increment.call_args[0][0], + 'notification.bridge.error') + self.router.metrics.increment.call_args[1]['tags'].sort() + eq_(self.router.metrics.increment.call_args[1]['tags'], + ['platform:gcm', 'reason:unregistered']) ok_(self.router.gcm['test123'].send.called) d.addCallback(check_results) return d @@ -573,7 +585,10 @@ def test_router_notification_gcm_failed_items(self): def check_results(fail): ok_(self.router.metrics.increment.called) eq_(self.router.metrics.increment.call_args[0][0], - 'updates.client.bridge.gcm.failed.failure') + 'notification.bridge.error') + self.router.metrics.increment.call_args[1]['tags'].sort() + eq_(self.router.metrics.increment.call_args[1]['tags'], + ['platform:gcm', 'reason:failure']) eq_(fail.value.message, 'GCM unable to deliver') self._check_error_call(fail.value, 410) d.addBoth(check_results) @@ -588,7 +603,10 @@ def test_router_notification_gcm_needs_retry(self): def check_results(fail): ok_(self.router.metrics.increment.called) eq_(self.router.metrics.increment.call_args[0][0], - 'updates.client.bridge.gcm.failed.retry') + 'notification.bridge.error') + self.router.metrics.increment.call_args[1]['tags'].sort() + eq_(self.router.metrics.increment.call_args[1]['tags'], + ['platform:gcm', 'reason:retry']) eq_(fail.value.message, 'GCM failure to deliver, retry') self._check_error_call(fail.value, 503) d.addBoth(check_results) @@ -1113,9 +1131,6 @@ def test_route_to_busy_node_saves_looks_up_and_sends_check_200(self): def verify_deliver(result): ok_(isinstance(result, RouterResponse)) eq_(result.status_code, 200) - self.metrics.increment.assert_called_with( - "router.broadcast.save_hit" - ) d.addBoth(verify_deliver) return d @@ -1202,9 +1217,6 @@ def verify_deliver(result): eq_(t_h.get('encryption'), self.headers.get('encryption')) eq_(t_h.get('crypto_key'), self.headers.get('crypto-key')) eq_(t_h.get('encoding'), self.headers.get('content-encoding')) - self.metrics.increment.assert_called_with( - "router.broadcast.save_hit" - ) ok_("Location" in result.headers) d.addCallback(verify_deliver) diff --git a/autopush/tests/test_web_base.py b/autopush/tests/test_web_base.py index f9a518fd..e29f5ac1 100644 --- a/autopush/tests/test_web_base.py +++ b/autopush/tests/test_web_base.py @@ -17,6 +17,8 @@ from autopush.http import EndpointHTTPFactory from autopush.exceptions import InvalidRequest from autopush.settings import AutopushSettings +from autopush.metrics import SinkMetrics +from autopush.tests.support import test_db dummy_request_id = "11111111-1234-1234-1234-567812345678" dummy_uaid = str(uuid.UUID("abad1dea00000000aabbccdd00000000")) @@ -58,7 +60,8 @@ def setUp(self): host='example.com:8080') self.base = BaseWebHandler( - EndpointHTTPFactory(settings, db=None, routers=None), + EndpointHTTPFactory(settings, db=test_db(SinkMetrics()), + routers=None), self.request_mock ) self.status_mock = self.base.set_status = Mock() diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index 388e0abc..16bc5cd5 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -126,6 +126,8 @@ def setUp(self): self.proto._log_exc = False self.proto.log = Mock(spec=Logger) + self.proto.debug = True + self.proto.sendMessage = self.send_mock = Mock() self.orig_close = self.proto.sendClose request_mock = Mock(spec=ConnectionRequest) @@ -222,7 +224,6 @@ def test_nuke_connection(self, mock_reactor): self.proto.state = "" self.proto.ps.uaid = uuid.uuid4().hex self.proto.nukeConnection() - ok_(self.proto.metrics.increment.called) @patch("autopush.websocket.reactor") def test_nuke_connection_shutdown_ran(self, mock_reactor): @@ -264,17 +265,6 @@ def test_base_tags(self): 'ua_browser_family:Firefox', 'host:example.com:8080'])) - def test_reporter(self): - self.metrics.reset_mock() - self.factory.periodic_reporter(self.metrics) - - # Verify metric increase of nothing - calls = self.metrics.method_calls - eq_(len(calls), 4) - name, args, _ = calls[0] - eq_(name, "gauge") - eq_(args, ("update.client.writers", 0)) - def test_handshake_sub(self): self.factory.externalPort = 80 @@ -435,10 +425,10 @@ def test_close_with_delivery_cleanup_and_get_no_result(self): # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.notify_uaid_failure", tags=None) + yield self._wait_for(lambda: len(self.metrics.mock_calls) > 0) + eq_(self.metrics.timing.call_args[0][0], 'ua.connection.lifespan') + # Wait for final cleanup (no error or metric produced) + yield sleep(.25) @inlineCallbacks def test_close_with_delivery_cleanup_and_get_uaid_error(self): @@ -462,10 +452,9 @@ def raise_item(*args, **kwargs): # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.lookup_uaid_failure", tags=None) + yield self._wait_for(lambda: self.proto.log.info.called) + # Wait for final cleanup (no error or metric produced) + yield sleep(.25) @inlineCallbacks def test_close_with_delivery_cleanup_and_no_node_id(self): @@ -1766,6 +1755,7 @@ def test_notif_finished_with_too_many_messages(self): d = Deferred() def check(*args, **kwargs): + eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"]) ok_(self.proto.force_retry.called) ok_(self.send_mock.called) d.callback(True) diff --git a/autopush/utils.py b/autopush/utils.py index 99d8cbca..5423fbcb 100644 --- a/autopush/utils.py +++ b/autopush/utils.py @@ -270,6 +270,7 @@ class WebPushNotification(object): timestamp = attrib(default=Factory(lambda: int(time.time()))) # type: int sortkey_timestamp = attrib(default=None) # type: Optional[int] topic = attrib(default=None) # type: Optional[str] + source = attrib(default="Direct") # type: Optional[str] message_id = attrib(default=None) # type: str @@ -432,6 +433,11 @@ def location(self): """Return an appropriate value for the Location header""" return self.message_id + @property + def data_length(self): + """Return the length of the data""" + return len(self.data or "") + def expired(self, at_time=None): # type: (Optional[int]) -> bool """Indicates whether the message has expired or not @@ -460,7 +466,8 @@ def from_message_table(cls, uaid, item): message_id=key_info["message_id"], update_id=item.get("updateid"), timestamp=item.get("timestamp"), - sortkey_timestamp=key_info.get("sortkey_timestamp") + sortkey_timestamp=key_info.get("sortkey_timestamp"), + source="Stored" ) # Ensure we generate the sort-key properly for legacy messges diff --git a/autopush/web/base.py b/autopush/web/base.py index cce2425d..bd70d772 100644 --- a/autopush/web/base.py +++ b/autopush/web/base.py @@ -134,6 +134,10 @@ class Notification(object): data = attrib() channel_id = attrib() + @property + def data_length(self): + return len(self.data or "") + class BaseWebHandler(BaseHandler): """Common overrides for Push web API's""" @@ -267,10 +271,11 @@ def _router_response(self, response): errno=response.errno or 999, message=response.response_body) - def _router_fail_err(self, fail): + def _router_fail_err(self, fail, router_type=None, vapid=False): """errBack for router failures""" fail.trap(RouterException) exc = fail.value + success = False if exc.log_exception: if exc.status_code >= 500: fmt = fail.value.message or 'Exception' @@ -283,12 +288,26 @@ def _router_fail_err(self, fail): self.log.debug(format="Success", status_code=exc.status_code, logged_status=exc.logged_status or 0, client_info=self._client_info) + success = True + self.metrics.increment('notification.message.success', + tags=[ + 'destination:Direct', + 'router:{}'.format(router_type), + 'vapid:{}'.format(vapid is not None) + ]) elif 400 <= exc.status_code < 500: self.log.debug(format="Client error", status_code=exc.status_code, logged_status=exc.logged_status or 0, errno=exc.errno or 0, client_info=self._client_info) + if not success: + self.metrics.increment('notification.message.error', + tags=[ + "code:{}".format(exc.status_code), + "router:{}".format(router_type), + "vapid:{}".format(vapid is not None) + ]) self._router_response(exc) def _write_validation_err(self, errors): diff --git a/autopush/web/registration.py b/autopush/web/registration.py index f13ae4ec..c85b64b5 100644 --- a/autopush/web/registration.py +++ b/autopush/web/registration.py @@ -337,8 +337,7 @@ def post(self, router_type, router_data): Router type/data registration. """ - self.metrics.increment("updates.client.register", - tags=self.base_tags()) + self.metrics.increment("ua.command.register", tags=self.base_tags()) uaid = uuid.uuid4() @@ -455,7 +454,7 @@ class ChannelRegistrationHandler(BaseRegistrationHandler): @threaded_validate(UnregisterChidSchema) def delete(self, uaid, chid): # type: (uuid.UUID, str) -> Deferred - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags()) d = deferToThread(self._delete_channel, uaid, chid) d.addCallback(self._success) diff --git a/autopush/web/simplepush.py b/autopush/web/simplepush.py index 6f5cc469..fd31d216 100644 --- a/autopush/web/simplepush.py +++ b/autopush/web/simplepush.py @@ -21,6 +21,7 @@ ) from autopush.db import hasher +from autopush.metrics import make_tags from autopush.web.base import ( threaded_validate, Notification, @@ -128,14 +129,21 @@ def put(self, subscription, version, data): def _router_completed(self, response, uaid_data, warning=""): """Called after router has completed successfully""" + dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) elif response.status_code == 202 or response.logged_status == 202: self.log.debug(format="Router miss, message stored.", client_info=self._client_info) + dest = 'Stored' time_diff = time.time() - self._start_time - self.metrics.timing("updates.handled", duration=time_diff) + self.metrics.timing("notification.request_time", duration=time_diff) + self.metrics.increment('notification.message.success', + tags=make_tags( + destination=dest, + router='simplepush') + ) response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index f97db29a..8996161d 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -26,7 +26,7 @@ from autopush.crypto_key import CryptoKey from autopush.db import DatabaseManager # noqa -from autopush.metrics import Metrics # noqa +from autopush.metrics import Metrics, make_tags # noqa from autopush.db import dump_uaid, hasher from autopush.exceptions import ( InvalidRequest, @@ -117,7 +117,8 @@ def _validate_webpush(self, d, result): log.debug(format="Dropping User", code=102, uaid_hash=hasher(uaid), uaid_record=dump_uaid(result)) - metrics.increment("updates.drop_user", tags={"errno": 102}) + metrics.increment("updates.drop_user", + tags=make_tags(errno=102)) db.router.drop_user(uaid) raise InvalidRequest("No such subscription", status_code=410, errno=106) @@ -127,7 +128,8 @@ def _validate_webpush(self, d, result): log.debug(format="Dropping User", code=103, uaid_hash=hasher(uaid), uaid_record=dump_uaid(result)) - metrics.increment("updates.drop_user", tags={"errno": 103}) + metrics.increment("updates.drop_user", + tags=make_tags(errno=103)) db.router.drop_user(uaid) raise InvalidRequest("No such subscription", status_code=410, errno=106) @@ -461,20 +463,26 @@ def post(self, uaid_hash=hasher(user_data.get("uaid")), channel_id=user_data.get("chid"), router_key=user_data["router_type"], - message_size=len(notification.data or ""), + message_size=notification.data_length, message_ttl=notification.ttl, version=notification.version, encoding=encoding, ) - router = self.routers[user_data["router_type"]] + router_type = user_data["router_type"] + router = self.routers[router_type] self._router_time = time.time() d = maybeDeferred(router.route_notification, notification, user_data) - d.addCallback(self._router_completed, user_data, "") - d.addErrback(self._router_fail_err) + d.addCallback(self._router_completed, user_data, "", + router_type=router_type, + vapid=jwt) + d.addErrback(self._router_fail_err, + router_type=router_type, + vapid=jwt is not None) d.addErrback(self._response_err) return d - def _router_completed(self, response, uaid_data, warning=""): + def _router_completed(self, response, uaid_data, warning="", + router_type=None, vapid=None): """Called after router has completed successfully""" # Log the time taken for routing self._timings["route_time"] = time.time() - self._router_time @@ -502,10 +510,13 @@ def _router_completed(self, response, uaid_data, warning=""): d.addCallback(lambda x: self._router_completed( response, uaid_data, - warning)) + warning, + router_type, + vapid)) return d else: # No changes are requested by the bridge system, proceed as normal + dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) @@ -513,7 +524,16 @@ def _router_completed(self, response, uaid_data, warning=""): self.log.debug( format="Router miss, message stored.", client_info=self._client_info) - self.metrics.timing("updates.handled", duration=time_diff) + dest = 'Stored' + self.metrics.timing("notification.request_time", + duration=time_diff) + self.metrics.increment('notification.message.success', + tags=make_tags( + destination=dest, + router=router_type, + vapid=(vapid is not None)) + ) + response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/websocket.py b/autopush/websocket.py index 737f0704..62339914 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -94,7 +94,7 @@ from autopush.exceptions import MessageOverloadException from autopush.noseplugin import track_object from autopush.protocol import IgnoreBody -from autopush.metrics import IMetrics # noqa +from autopush.metrics import IMetrics, make_tags # noqa from autopush.settings import AutopushSettings # noqa from autopush.ssl import AutopushSSLContextFactory from autopush.types import JSONDict # noqa @@ -244,9 +244,6 @@ def __attrs_post_init__(self): if self.stats.host: self._base_tags.append("host:%s" % self.stats.host) - self.db.metrics.increment("client.socket.connect", - tags=self._base_tags or None) - # Message table rotation initial settings self.message_month = self.db.current_msg_month @@ -434,13 +431,9 @@ def _sendAutoPing(self): """Override for sanity checking during auto-ping interval""" if not self.ps.uaid: # No uaid yet, drop the connection - self.metrics.increment("client.autoping.no_uaid", - tags=self.base_tags) self.sendClose() elif self.factory.clients.get(self.ps.uaid) != self: # UAID, but we're not in clients anymore for some reason - self.metrics.increment("client.autoping.invalid_client", - tags=self.base_tags) self.sendClose() return WebSocketServerProtocol._sendAutoPing(self) @@ -457,14 +450,7 @@ def nukeConnection(self): still hadn't run by this point""" # Did onClose get called? If so, we shutdown properly, no worries. if hasattr(self, "_shutdown_ran"): - self.metrics.increment("client.success.sendClose", - tags=self.base_tags) return - - # Uh-oh, we have not been shut-down properly, report detailed data - self.metrics.increment("client.error.sendClose_failed", - tags=self.base_tags) - self.transport.abortConnection() @log_exception @@ -586,9 +572,8 @@ def onClose(self, wasClean, code, reason): def cleanUp(self, wasClean, code, reason): """Thorough clean-up method to cancel all remaining deferreds, and send connection metrics in""" - self.metrics.increment("client.socket.disconnect", tags=self.base_tags) elapsed = (ms_time() - self.ps.connected_at) / 1000.0 - self.metrics.timing("client.socket.lifespan", duration=elapsed, + self.metrics.timing("ua.connection.lifespan", duration=elapsed, tags=self.base_tags) self.ps.stats.connection_time = int(elapsed) @@ -659,15 +644,11 @@ def _trap_uaid_not_found(self, fail): # type: (failure.Failure) -> None """Traps UAID not found error""" fail.trap(ItemNotFound) - self.metrics.increment("client.lookup_uaid_failure", - tags=self.base_tags) def _notify_node(self, result): """Checks the result of lookup node to send the notify if the client is connected elsewhere now""" if not result: - self.metrics.increment("client.notify_uaid_failure", - tags=self.base_tags) return node_id = result.get("node_id") @@ -837,9 +818,10 @@ def _verify_user_record(self): self.log.debug(format="Dropping User", code=104, uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) + tags = ['code:104'] + tags.extend(self.base_tags or []) + self.metrics.increment("ua.expiration", tags=tags) self.force_retry(self.db.router.drop_user, self.ps.uaid) - self.metrics.increment("client.drop_user", - tags={"errno": 104}) return None # Validate webpush records @@ -852,8 +834,9 @@ def _verify_user_record(self): uaid_record=dump_uaid(record)) self.force_retry(self.db.router.drop_user, self.ps.uaid) - self.metrics.increment("client.drop_user", - tags={"errno": 105}) + tags = ['code:105'] + tags.extend(self.base_tags or []) + self.metrics.increment("ua.expiration", tags=tags) return None # Determine if message table rotation is needed @@ -942,7 +925,7 @@ def finish_hello(self, previous): self.sendJSON(msg) self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash, **self.ps.raw_agent) - self.metrics.increment("updates.client.hello", tags=self.base_tags) + self.metrics.increment("ua.command.hello", tags=self.base_tags) self.process_notifications() def process_notifications(self): @@ -1120,8 +1103,10 @@ def finish_webpush_notifications(self, result): if self.sent_notification_count > self.ap_settings.msg_limit: raise MessageOverloadException() if notif.topic: - self.metrics.increment("updates.notification.topic", + self.metrics.increment("notification.topic", tags=self.base_tags) + self.metrics.gauge('ua.message_data', len(msg.get('data', '')), + tags=make_tags(source=notif.source)) self.sendJSON(msg) # Did we send any messages? @@ -1194,7 +1179,6 @@ def error_monthly_rotation_overload(self, fail): def _send_ping(self): """Helper for ping sending that tracks when the ping was sent""" self.ps.last_ping = time.time() - self.metrics.increment("updates.client.ping", tags=self.base_tags) return self.sendMessage("{}", False) def process_ping(self): @@ -1270,7 +1254,7 @@ def send_register_finish(self, result, endpoint, chid): "status": 200 } self.sendJSON(msg) - self.metrics.increment("updates.client.register", tags=self.base_tags) + self.metrics.increment("ua.command.register", tags=self.base_tags) self.ps.stats.registers += 1 self.log.debug(format="Register", channel_id=chid, endpoint=endpoint, @@ -1288,7 +1272,7 @@ def process_unregister(self, data): except ValueError: return self.bad_message("unregister", "Invalid ChannelID") - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags) self.ps.stats.unregisters += 1 event = dict(format="Unregister", channel_id=chid, @@ -1451,7 +1435,7 @@ def process_ack(self, data): if not updates or not isinstance(updates, list): return - self.metrics.increment("updates.client.ack", tags=self.base_tags) + self.metrics.increment("ua.command.ack", tags=self.base_tags) defers = filter(None, map(self.ack_update, updates)) if defers: @@ -1471,6 +1455,9 @@ def process_nack(self, data): self.log.debug(format="Nack", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, message_id=str(version), code=code, **self.ps.raw_agent) + tags = ["code:401"] + tags.extend(self.base_tags or []) + self.metrics.increment('ua.command.nack', tags=tags) self.ps.stats.nacks += 1 def check_missed_notifications(self, results, resume=False): @@ -1559,17 +1546,6 @@ def __init__(self, ap_settings, db, agent, clients): closeHandshakeTimeout=ap_settings.close_handshake_timeout, ) - def periodic_reporter(self, metrics): - # type: (IMetrics) -> None - """Twisted Task function that runs every few seconds to emit general - metrics regarding twisted and client counts. - - """ - metrics.gauge("update.client.writers", len(reactor.getWriters())) - metrics.gauge("update.client.readers", len(reactor.getReaders())) - metrics.gauge("update.client.connections", len(self.clients)) - metrics.gauge("update.client.ws_connections", self.countConnections) - class RouterHandler(BaseHandler): """Router Handler @@ -1587,20 +1563,17 @@ def put(self, uaid): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.router.disconnected") self.write("Client not connected.") return if client.paused: self.set_status(503, reason=None) - self.metrics.increment("updates.router.busy") self.write("Client busy.") return update = json.loads(self.request.body) client.send_notification(update) - self.metrics.increment("updates.router.received") self.write("Client accepted for delivery") @@ -1616,7 +1589,6 @@ def put(self, uaid, *args): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.notification.disconnected") self.write("Client not connected.") return @@ -1624,13 +1596,12 @@ def put(self, uaid, *args): # Client already busy waiting for stuff, flag for check client._check_notifications = True self.set_status(202) - self.metrics.increment("updates.notification.flagged") self.write("Flagged for Notification check") return # Client is online and idle, start a notification check client.process_notifications() - self.metrics.increment("updates.notification.checking") + self.metrics.increment("ua.notification_check") self.write("Notification check started") def delete(self, uaid, connected_at):