diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index 6e1f32c9..751a10bd 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -173,14 +173,15 @@ def _route(self, notification, router_data): application=rel_channel)) self.metrics.increment( - "updates.client.bridge.apns.%s.sent" % - router_data["rel_channel"], - self._base_tags + "updates.client.bridge.apns.{}.sent".format( + router_data["rel_channel"] + ), + tags=self._base_tags ) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) return RouterResponse(status_code=201, response_body="", headers={"TTL": notification.ttl, "Location": location}, diff --git a/autopush/router/fcm.py b/autopush/router/fcm.py index fe203ee5..eaa36ed9 100644 --- a/autopush/router/fcm.py +++ b/autopush/router/fcm.py @@ -267,11 +267,11 @@ def _process_reply(self, reply, notification, router_data, ttl): router_data={}, ) self.metrics.increment("notification.bridge.sent", - self._base_tags) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination="Direct")) + tags=self._base_tags) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination="Direct")) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 0e6071f9..98e10dc1 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -193,11 +193,11 @@ def _process_reply(self, reply, uaid_data, ttl, notification): log_exception=False) self.metrics.increment("notification.bridge.sent", - self._base_tags) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination='Direct')) + tags=self._base_tags) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/simple.py b/autopush/router/simple.py index 97b18177..66fcd2f3 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -62,15 +62,15 @@ def amend_endpoint_response(self, response, router_data): """Stubbed out for this router""" def stored_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Stored')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) return RouterResponse(202, "Notification Stored") def delivered_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) return RouterResponse(200, "Delivered") @inlineCallbacks diff --git a/autopush/router/webpush.py b/autopush/router/webpush.py index 4591fcc1..8e3f4807 100644 --- a/autopush/router/webpush.py +++ b/autopush/router/webpush.py @@ -25,9 +25,9 @@ class WebPushRouter(SimpleRouter): """SimpleRouter subclass to store individual messages appropriately""" def delivered_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Stored')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", @@ -36,9 +36,9 @@ def delivered_response(self, notification): logged_status=200) def stored_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index c2aa9004..a9ce9645 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -722,8 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci), "message_size not logged") - eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'], - ['source:Stored']) + inc_call = self.conn.db.metrics._client.increment.call_args_list[5] + eq_(inc_call[1]['tags'], ['source:Stored']) yield self.shut_down(client) @inlineCallbacks @@ -1266,8 +1266,9 @@ def test_message_with_topic(self): client = yield self.quick_register(use_webpush=True) yield client.send_notification(data=data, topic="topicname") self.conn.db.metrics.increment.assert_has_calls([ - call('updates.notification.topic', - tags=['host:localhost', 'use_webpush:True']) + call('ua.command.hello'), + call('ua.command.register'), + call('ua.notification.topic') ]) yield self.shut_down(client) diff --git a/autopush/tests/test_web_base.py b/autopush/tests/test_web_base.py index e29f5ac1..d87c904f 100644 --- a/autopush/tests/test_web_base.py +++ b/autopush/tests/test_web_base.py @@ -218,14 +218,14 @@ def test_boto_err(self): def test_router_response(self): from autopush.router.interface import RouterResponse response = RouterResponse(headers=dict(Location="http://a.com/")) - self.base._router_response(response) + self.base._router_response(response, None, None) self.status_mock.assert_called_with(200, reason=None) def test_router_response_client_error(self): from autopush.router.interface import RouterResponse response = RouterResponse(headers=dict(Location="http://a.com/"), status_code=400) - self.base._router_response(response) + self.base._router_response(response, None, None) self.status_mock.assert_called_with(400, reason=None) def test_router_fail_err(self): diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index cb0438fa..d9935ae8 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -1755,7 +1755,7 @@ def test_notif_finished_with_too_many_messages(self): d = Deferred() def check(*args, **kwargs): - eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"]) + eq_(self.metrics.increment.call_args[1]['tags'], ["source:Direct"]) ok_(self.proto.force_retry.called) ok_(self.send_mock.called) d.callback(True) diff --git a/autopush/web/base.py b/autopush/web/base.py index bd70d772..20c84d3e 100644 --- a/autopush/web/base.py +++ b/autopush/web/base.py @@ -154,6 +154,7 @@ def initialize(self): self._base_tags = {} self._start_time = time.time() self._timings = {} + self._handling_message = False @property def routers(self): @@ -186,7 +187,9 @@ def head(self, *args, **kwargs): ############################################################# def _write_response(self, status_code, errno, message=None, error=None, headers=None, - url=DEFAULT_ERR_URL): + url=DEFAULT_ERR_URL, + router_type=None, + vapid=None): """Writes out a full JSON error and sets the appropriate status""" self.set_status(status_code, reason=error) error_data = dict( @@ -207,6 +210,13 @@ def _write_response(self, status_code, errno, message=None, error=None, if status_code == 410: self.set_header("Cache-Control", "max-age=86400") + if self._handling_message and status_code >= 300: + self.metrics.increment('notification.message.error', + tags=[ + "code:{}".format(status_code), + "router:{}".format(router_type), + "vapid:{}".format(vapid is not None) + ]) self._track_timing() self.finish() @@ -255,7 +265,7 @@ def _boto_err(self, fail): self._write_response(503, errno=202, message="Communication error, please retry") - def _router_response(self, response): + def _router_response(self, response, router_type, vapid): for name, val in response.headers.items(): if val is not None: self.set_header(name, val) @@ -263,19 +273,33 @@ def _router_response(self, response): if 200 <= response.status_code < 300: self.set_status(response.status_code, reason=None) self.write(response.response_body) + + dest = 'Direct' + if response.status_code == 202 or response.logged_status == 202: + dest = 'Stored' + + if self._handling_message: + self.metrics.increment('notification.message.success', + tags=[ + 'destination:{}'.format(dest), + 'router:{}'.format(router_type), + 'vapid:{}'.format(vapid is not None) + ]) self._track_timing(status_code=response.logged_status) self.finish() else: self._write_response( response.status_code, errno=response.errno or 999, - message=response.response_body) + message=response.response_body, + router_type=router_type, + vapid=vapid + ) def _router_fail_err(self, fail, router_type=None, vapid=False): """errBack for router failures""" fail.trap(RouterException) exc = fail.value - success = False if exc.log_exception: if exc.status_code >= 500: fmt = fail.value.message or 'Exception' @@ -288,27 +312,13 @@ def _router_fail_err(self, fail, router_type=None, vapid=False): self.log.debug(format="Success", status_code=exc.status_code, logged_status=exc.logged_status or 0, client_info=self._client_info) - success = True - self.metrics.increment('notification.message.success', - tags=[ - 'destination:Direct', - 'router:{}'.format(router_type), - 'vapid:{}'.format(vapid is not None) - ]) elif 400 <= exc.status_code < 500: self.log.debug(format="Client error", status_code=exc.status_code, logged_status=exc.logged_status or 0, errno=exc.errno or 0, client_info=self._client_info) - if not success: - self.metrics.increment('notification.message.error', - tags=[ - "code:{}".format(exc.status_code), - "router:{}".format(router_type), - "vapid:{}".format(vapid is not None) - ]) - self._router_response(exc) + self._router_response(exc, router_type, vapid) def _write_validation_err(self, errors): """Writes a set of validation errors out with details about what diff --git a/autopush/web/simplepush.py b/autopush/web/simplepush.py index fd31d216..1c0c8781 100644 --- a/autopush/web/simplepush.py +++ b/autopush/web/simplepush.py @@ -21,7 +21,6 @@ ) from autopush.db import hasher -from autopush.metrics import make_tags from autopush.web.base import ( threaded_validate, Notification, @@ -104,6 +103,11 @@ def extract_fields(self, d): class SimplePushHandler(BaseWebHandler): cors_methods = "PUT" + def initialize(self): + """Must run on initialization to set ahead of validation""" + super(SimplePushHandler, self).initialize() + self._handling_message = True + @threaded_validate(SimplePushRequestSchema) def put(self, subscription, version, data): # type: (Dict[str, Any], str, str) -> Deferred @@ -129,21 +133,14 @@ def put(self, subscription, version, data): def _router_completed(self, response, uaid_data, warning=""): """Called after router has completed successfully""" - dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) elif response.status_code == 202 or response.logged_status == 202: self.log.debug(format="Router miss, message stored.", client_info=self._client_info) - dest = 'Stored' time_diff = time.time() - self._start_time self.metrics.timing("notification.request_time", duration=time_diff) - self.metrics.increment('notification.message.success', - tags=make_tags( - destination=dest, - router='simplepush') - ) response.response_body = ( response.response_body + " " + warning).strip() - self._router_response(response) + self._router_response(response, router_type="simplepush", vapid=None) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index b812fc79..7753edbf 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -441,6 +441,11 @@ class WebPushHandler(BaseWebHandler): "authorization") cors_response_headers = ("location", "www-authenticate") + def initialize(self): + """Must run on initialization to set ahead of validation""" + super(WebPushHandler, self).initialize() + self._handling_message = True + @threaded_validate(WebPushRequestSchema) def post(self, subscription, # type: Dict[str, Any] @@ -502,7 +507,9 @@ def _router_completed(self, response, uaid_data, warning="", uaid_record=dump_uaid(uaid_data), client_info=self._client_info) d = deferToThread(self.db.router.drop_user, uaid_data["uaid"]) - d.addCallback(lambda x: self._router_response(response)) + d.addCallback(lambda x: self._router_response(response, + router_type, + vapid)) return d # The router data needs to be updated to include any changes # requested by the bridge system @@ -520,7 +527,6 @@ def _router_completed(self, response, uaid_data, warning="", return d else: # No changes are requested by the bridge system, proceed as normal - dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) @@ -528,16 +534,8 @@ def _router_completed(self, response, uaid_data, warning="", self.log.debug( format="Router miss, message stored.", client_info=self._client_info) - dest = 'Stored' self.metrics.timing("notification.request_time", duration=time_diff) - self.metrics.increment('notification.message.success', - tags=make_tags( - destination=dest, - router=router_type, - vapid=(vapid is not None)) - ) - response.response_body = ( response.response_body + " " + warning).strip() - self._router_response(response) + self._router_response(response, router_type, vapid) diff --git a/autopush/websocket.py b/autopush/websocket.py index 1b9323ca..fa9afc5b 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -819,7 +819,6 @@ def _verify_user_record(self): uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) tags = ['code:104'] - tags.extend(self.base_tags or []) self.metrics.increment("ua.expiration", tags=tags) self.force_retry(self.db.router.drop_user, self.ps.uaid) return None @@ -835,7 +834,6 @@ def _verify_user_record(self): self.force_retry(self.db.router.drop_user, self.ps.uaid) tags = ['code:105'] - tags.extend(self.base_tags or []) self.metrics.increment("ua.expiration", tags=tags) return None @@ -925,7 +923,7 @@ def finish_hello(self, previous): self.sendJSON(msg) self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash, **self.ps.raw_agent) - self.metrics.increment("ua.command.hello", tags=self.base_tags) + self.metrics.increment("ua.command.hello") self.process_notifications() def process_notifications(self): @@ -1103,10 +1101,9 @@ def finish_webpush_notifications(self, result): if self.sent_notification_count > self.ap_settings.msg_limit: raise MessageOverloadException() if notif.topic: - self.metrics.increment("notification.topic", - tags=self.base_tags) - self.metrics.gauge('ua.message_data', len(msg.get('data', '')), - tags=make_tags(source=notif.source)) + self.metrics.increment("ua.notification.topic") + self.metrics.increment('ua.message_data', len(msg.get('data', '')), + tags=make_tags(source=notif.source)) self.sendJSON(msg) # Did we send any messages? @@ -1254,7 +1251,7 @@ def send_register_finish(self, result, endpoint, chid): "status": 200 } self.sendJSON(msg) - self.metrics.increment("ua.command.register", tags=self.base_tags) + self.metrics.increment("ua.command.register") self.ps.stats.registers += 1 self.log.info(format="Register", channel_id=chid, endpoint=endpoint, @@ -1272,8 +1269,7 @@ def process_unregister(self, data): except ValueError: return self.bad_message("unregister", "Invalid ChannelID") - self.metrics.increment("ua.command.unregister", - tags=self.base_tags) + self.metrics.increment("ua.command.unregister") self.ps.stats.unregisters += 1 event = dict(format="Unregister", channel_id=chid, uaid_hash=self.ps.uaid_hash, @@ -1435,7 +1431,7 @@ def process_ack(self, data): if not updates or not isinstance(updates, list): return - self.metrics.increment("ua.command.ack", tags=self.base_tags) + self.metrics.increment("ua.command.ack") defers = filter(None, map(self.ack_update, updates)) if defers: @@ -1456,7 +1452,6 @@ def process_nack(self, data): user_agent=self.ps.user_agent, message_id=str(version), code=code, **self.ps.raw_agent) tags = ["code:401"] - tags.extend(self.base_tags or []) self.metrics.increment('ua.command.nack', tags=tags) self.ps.stats.nacks += 1 @@ -1516,8 +1511,7 @@ def send_notification(self, update): update) self.ps.direct_updates[chid].append(notif) if notif.topic: - self.metrics.increment("updates.notification.topic", - tags=self.base_tags) + self.metrics.increment("ua.notification.topic") self.sendJSON(notif.websocket_format()) else: self.ps.direct_updates[chid] = version