From 637e246fd0dce1fc47e430f386279a708426d8cc Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Wed, 19 Jul 2017 09:49:43 -0700 Subject: [PATCH] fix: move metric increments to lowest callbacks The metric increments were being called for registration API calls due to an error callback. They weren't called for success cases as well. Moving them to the lower callbacks with a new flag should help ensure they're incremented correctly. Also fixed message_data calls to use increment instead of a gauge and removed sending of base_tags in websocket.py to avoid too many metric values. Closes #958 --- autopush/router/apnsrouter.py | 15 +++++----- autopush/router/fcm.py | 10 +++---- autopush/router/gcm.py | 10 +++---- autopush/router/simple.py | 12 ++++---- autopush/router/webpush.py | 12 ++++---- autopush/tests/test_integration.py | 9 +++--- autopush/tests/test_web_base.py | 4 +-- autopush/tests/test_websocket.py | 2 +- autopush/web/base.py | 48 ++++++++++++++++++------------ autopush/web/simplepush.py | 15 ++++------ autopush/web/webpush.py | 20 ++++++------- autopush/websocket.py | 22 +++++--------- 12 files changed, 90 insertions(+), 89 deletions(-) diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index 6e1f32c9..751a10bd 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -173,14 +173,15 @@ def _route(self, notification, router_data): application=rel_channel)) self.metrics.increment( - "updates.client.bridge.apns.%s.sent" % - router_data["rel_channel"], - self._base_tags + "updates.client.bridge.apns.{}.sent".format( + router_data["rel_channel"] + ), + tags=self._base_tags ) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) return RouterResponse(status_code=201, response_body="", headers={"TTL": notification.ttl, "Location": location}, diff --git a/autopush/router/fcm.py b/autopush/router/fcm.py index fe203ee5..eaa36ed9 100644 --- a/autopush/router/fcm.py +++ b/autopush/router/fcm.py @@ -267,11 +267,11 @@ def _process_reply(self, reply, notification, router_data, ttl): router_data={}, ) self.metrics.increment("notification.bridge.sent", - self._base_tags) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination="Direct")) + tags=self._base_tags) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination="Direct")) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 0e6071f9..98e10dc1 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -193,11 +193,11 @@ def _process_reply(self, reply, uaid_data, ttl, notification): log_exception=False) self.metrics.increment("notification.bridge.sent", - self._base_tags) - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(self._base_tags, - destination='Direct')) + tags=self._base_tags) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(self._base_tags, + destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/simple.py b/autopush/router/simple.py index 97b18177..66fcd2f3 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -62,15 +62,15 @@ def amend_endpoint_response(self, response, router_data): """Stubbed out for this router""" def stored_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Stored')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) return RouterResponse(202, "Notification Stored") def delivered_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) return RouterResponse(200, "Delivered") @inlineCallbacks diff --git a/autopush/router/webpush.py b/autopush/router/webpush.py index 4591fcc1..8e3f4807 100644 --- a/autopush/router/webpush.py +++ b/autopush/router/webpush.py @@ -25,9 +25,9 @@ class WebPushRouter(SimpleRouter): """SimpleRouter subclass to store individual messages appropriately""" def delivered_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Stored')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Stored')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", @@ -36,9 +36,9 @@ def delivered_response(self, notification): logged_status=200) def stored_response(self, notification): - self.metrics.gauge("notification.message_data", - notification.data_length, - tags=make_tags(destination='Direct')) + self.metrics.increment("notification.message_data", + notification.data_length, + tags=make_tags(destination='Direct')) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index c2aa9004..a9ce9645 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -722,8 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci), "message_size not logged") - eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'], - ['source:Stored']) + inc_call = self.conn.db.metrics._client.increment.call_args_list[5] + eq_(inc_call[1]['tags'], ['source:Stored']) yield self.shut_down(client) @inlineCallbacks @@ -1266,8 +1266,9 @@ def test_message_with_topic(self): client = yield self.quick_register(use_webpush=True) yield client.send_notification(data=data, topic="topicname") self.conn.db.metrics.increment.assert_has_calls([ - call('updates.notification.topic', - tags=['host:localhost', 'use_webpush:True']) + call('ua.command.hello'), + call('ua.command.register'), + call('ua.notification.topic') ]) yield self.shut_down(client) diff --git a/autopush/tests/test_web_base.py b/autopush/tests/test_web_base.py index e29f5ac1..d87c904f 100644 --- a/autopush/tests/test_web_base.py +++ b/autopush/tests/test_web_base.py @@ -218,14 +218,14 @@ def test_boto_err(self): def test_router_response(self): from autopush.router.interface import RouterResponse response = RouterResponse(headers=dict(Location="http://a.com/")) - self.base._router_response(response) + self.base._router_response(response, None, None) self.status_mock.assert_called_with(200, reason=None) def test_router_response_client_error(self): from autopush.router.interface import RouterResponse response = RouterResponse(headers=dict(Location="http://a.com/"), status_code=400) - self.base._router_response(response) + self.base._router_response(response, None, None) self.status_mock.assert_called_with(400, reason=None) def test_router_fail_err(self): diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index cb0438fa..d9935ae8 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -1755,7 +1755,7 @@ def test_notif_finished_with_too_many_messages(self): d = Deferred() def check(*args, **kwargs): - eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"]) + eq_(self.metrics.increment.call_args[1]['tags'], ["source:Direct"]) ok_(self.proto.force_retry.called) ok_(self.send_mock.called) d.callback(True) diff --git a/autopush/web/base.py b/autopush/web/base.py index bd70d772..20c84d3e 100644 --- a/autopush/web/base.py +++ b/autopush/web/base.py @@ -154,6 +154,7 @@ def initialize(self): self._base_tags = {} self._start_time = time.time() self._timings = {} + self._handling_message = False @property def routers(self): @@ -186,7 +187,9 @@ def head(self, *args, **kwargs): ############################################################# def _write_response(self, status_code, errno, message=None, error=None, headers=None, - url=DEFAULT_ERR_URL): + url=DEFAULT_ERR_URL, + router_type=None, + vapid=None): """Writes out a full JSON error and sets the appropriate status""" self.set_status(status_code, reason=error) error_data = dict( @@ -207,6 +210,13 @@ def _write_response(self, status_code, errno, message=None, error=None, if status_code == 410: self.set_header("Cache-Control", "max-age=86400") + if self._handling_message and status_code >= 300: + self.metrics.increment('notification.message.error', + tags=[ + "code:{}".format(status_code), + "router:{}".format(router_type), + "vapid:{}".format(vapid is not None) + ]) self._track_timing() self.finish() @@ -255,7 +265,7 @@ def _boto_err(self, fail): self._write_response(503, errno=202, message="Communication error, please retry") - def _router_response(self, response): + def _router_response(self, response, router_type, vapid): for name, val in response.headers.items(): if val is not None: self.set_header(name, val) @@ -263,19 +273,33 @@ def _router_response(self, response): if 200 <= response.status_code < 300: self.set_status(response.status_code, reason=None) self.write(response.response_body) + + dest = 'Direct' + if response.status_code == 202 or response.logged_status == 202: + dest = 'Stored' + + if self._handling_message: + self.metrics.increment('notification.message.success', + tags=[ + 'destination:{}'.format(dest), + 'router:{}'.format(router_type), + 'vapid:{}'.format(vapid is not None) + ]) self._track_timing(status_code=response.logged_status) self.finish() else: self._write_response( response.status_code, errno=response.errno or 999, - message=response.response_body) + message=response.response_body, + router_type=router_type, + vapid=vapid + ) def _router_fail_err(self, fail, router_type=None, vapid=False): """errBack for router failures""" fail.trap(RouterException) exc = fail.value - success = False if exc.log_exception: if exc.status_code >= 500: fmt = fail.value.message or 'Exception' @@ -288,27 +312,13 @@ def _router_fail_err(self, fail, router_type=None, vapid=False): self.log.debug(format="Success", status_code=exc.status_code, logged_status=exc.logged_status or 0, client_info=self._client_info) - success = True - self.metrics.increment('notification.message.success', - tags=[ - 'destination:Direct', - 'router:{}'.format(router_type), - 'vapid:{}'.format(vapid is not None) - ]) elif 400 <= exc.status_code < 500: self.log.debug(format="Client error", status_code=exc.status_code, logged_status=exc.logged_status or 0, errno=exc.errno or 0, client_info=self._client_info) - if not success: - self.metrics.increment('notification.message.error', - tags=[ - "code:{}".format(exc.status_code), - "router:{}".format(router_type), - "vapid:{}".format(vapid is not None) - ]) - self._router_response(exc) + self._router_response(exc, router_type, vapid) def _write_validation_err(self, errors): """Writes a set of validation errors out with details about what diff --git a/autopush/web/simplepush.py b/autopush/web/simplepush.py index fd31d216..1c0c8781 100644 --- a/autopush/web/simplepush.py +++ b/autopush/web/simplepush.py @@ -21,7 +21,6 @@ ) from autopush.db import hasher -from autopush.metrics import make_tags from autopush.web.base import ( threaded_validate, Notification, @@ -104,6 +103,11 @@ def extract_fields(self, d): class SimplePushHandler(BaseWebHandler): cors_methods = "PUT" + def initialize(self): + """Must run on initialization to set ahead of validation""" + super(SimplePushHandler, self).initialize() + self._handling_message = True + @threaded_validate(SimplePushRequestSchema) def put(self, subscription, version, data): # type: (Dict[str, Any], str, str) -> Deferred @@ -129,21 +133,14 @@ def put(self, subscription, version, data): def _router_completed(self, response, uaid_data, warning=""): """Called after router has completed successfully""" - dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) elif response.status_code == 202 or response.logged_status == 202: self.log.debug(format="Router miss, message stored.", client_info=self._client_info) - dest = 'Stored' time_diff = time.time() - self._start_time self.metrics.timing("notification.request_time", duration=time_diff) - self.metrics.increment('notification.message.success', - tags=make_tags( - destination=dest, - router='simplepush') - ) response.response_body = ( response.response_body + " " + warning).strip() - self._router_response(response) + self._router_response(response, router_type="simplepush", vapid=None) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index b812fc79..7753edbf 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -441,6 +441,11 @@ class WebPushHandler(BaseWebHandler): "authorization") cors_response_headers = ("location", "www-authenticate") + def initialize(self): + """Must run on initialization to set ahead of validation""" + super(WebPushHandler, self).initialize() + self._handling_message = True + @threaded_validate(WebPushRequestSchema) def post(self, subscription, # type: Dict[str, Any] @@ -502,7 +507,9 @@ def _router_completed(self, response, uaid_data, warning="", uaid_record=dump_uaid(uaid_data), client_info=self._client_info) d = deferToThread(self.db.router.drop_user, uaid_data["uaid"]) - d.addCallback(lambda x: self._router_response(response)) + d.addCallback(lambda x: self._router_response(response, + router_type, + vapid)) return d # The router data needs to be updated to include any changes # requested by the bridge system @@ -520,7 +527,6 @@ def _router_completed(self, response, uaid_data, warning="", return d else: # No changes are requested by the bridge system, proceed as normal - dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) @@ -528,16 +534,8 @@ def _router_completed(self, response, uaid_data, warning="", self.log.debug( format="Router miss, message stored.", client_info=self._client_info) - dest = 'Stored' self.metrics.timing("notification.request_time", duration=time_diff) - self.metrics.increment('notification.message.success', - tags=make_tags( - destination=dest, - router=router_type, - vapid=(vapid is not None)) - ) - response.response_body = ( response.response_body + " " + warning).strip() - self._router_response(response) + self._router_response(response, router_type, vapid) diff --git a/autopush/websocket.py b/autopush/websocket.py index 1b9323ca..fa9afc5b 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -819,7 +819,6 @@ def _verify_user_record(self): uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) tags = ['code:104'] - tags.extend(self.base_tags or []) self.metrics.increment("ua.expiration", tags=tags) self.force_retry(self.db.router.drop_user, self.ps.uaid) return None @@ -835,7 +834,6 @@ def _verify_user_record(self): self.force_retry(self.db.router.drop_user, self.ps.uaid) tags = ['code:105'] - tags.extend(self.base_tags or []) self.metrics.increment("ua.expiration", tags=tags) return None @@ -925,7 +923,7 @@ def finish_hello(self, previous): self.sendJSON(msg) self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash, **self.ps.raw_agent) - self.metrics.increment("ua.command.hello", tags=self.base_tags) + self.metrics.increment("ua.command.hello") self.process_notifications() def process_notifications(self): @@ -1103,10 +1101,9 @@ def finish_webpush_notifications(self, result): if self.sent_notification_count > self.ap_settings.msg_limit: raise MessageOverloadException() if notif.topic: - self.metrics.increment("notification.topic", - tags=self.base_tags) - self.metrics.gauge('ua.message_data', len(msg.get('data', '')), - tags=make_tags(source=notif.source)) + self.metrics.increment("ua.notification.topic") + self.metrics.increment('ua.message_data', len(msg.get('data', '')), + tags=make_tags(source=notif.source)) self.sendJSON(msg) # Did we send any messages? @@ -1254,7 +1251,7 @@ def send_register_finish(self, result, endpoint, chid): "status": 200 } self.sendJSON(msg) - self.metrics.increment("ua.command.register", tags=self.base_tags) + self.metrics.increment("ua.command.register") self.ps.stats.registers += 1 self.log.info(format="Register", channel_id=chid, endpoint=endpoint, @@ -1272,8 +1269,7 @@ def process_unregister(self, data): except ValueError: return self.bad_message("unregister", "Invalid ChannelID") - self.metrics.increment("ua.command.unregister", - tags=self.base_tags) + self.metrics.increment("ua.command.unregister") self.ps.stats.unregisters += 1 event = dict(format="Unregister", channel_id=chid, uaid_hash=self.ps.uaid_hash, @@ -1435,7 +1431,7 @@ def process_ack(self, data): if not updates or not isinstance(updates, list): return - self.metrics.increment("ua.command.ack", tags=self.base_tags) + self.metrics.increment("ua.command.ack") defers = filter(None, map(self.ack_update, updates)) if defers: @@ -1456,7 +1452,6 @@ def process_nack(self, data): user_agent=self.ps.user_agent, message_id=str(version), code=code, **self.ps.raw_agent) tags = ["code:401"] - tags.extend(self.base_tags or []) self.metrics.increment('ua.command.nack', tags=tags) self.ps.stats.nacks += 1 @@ -1516,8 +1511,7 @@ def send_notification(self, update): update) self.ps.direct_updates[chid].append(notif) if notif.topic: - self.metrics.increment("updates.notification.topic", - tags=self.base_tags) + self.metrics.increment("ua.notification.topic") self.sendJSON(notif.websocket_format()) else: self.ps.direct_updates[chid] = version