From 1e331fe25f498b3921d4ede5b1c96da70a3a7694 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 29 Jun 2017 17:08:21 -0700 Subject: [PATCH] WIP: Add modern metrics Closes #943 --- autopush/db.py | 14 +-------- autopush/main.py | 1 - autopush/tests/test_websocket.py | 27 +++-------------- autopush/web/registration.py | 5 ++-- autopush/web/simplepush.py | 2 +- autopush/web/webpush.py | 2 +- autopush/websocket.py | 50 +++++--------------------------- 7 files changed, 16 insertions(+), 85 deletions(-) diff --git a/autopush/db.py b/autopush/db.py index 7060598e..211e71d8 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -298,17 +298,7 @@ def track_provisioned(func): def wrapper(self, *args, **kwargs): if TRACK_DB_CALLS: DB_CALLS.append(func.__name__) - try: - return func(self, *args, **kwargs) - except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.%s" % func.__name__) - raise - except JSONResponseError: - self.metrics.increment("error.jsonresponse.%s" % func.__name__) - raise - except BotoServerError: - self.metrics.increment("error.botoserver.%s" % func.__name__) - raise + return func(self, *args, **kwargs) return wrapper @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None): chid=normalize_id(chid)) return True except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.delete_notification") return False @@ -678,7 +667,6 @@ def get_uaid(self, uaid): # We unfortunately have to catch this here, as track_provisioned # will not see this, since JSONResponseError is a subclass and # will capture it - self.metrics.increment("error.provisioned.get_uaid") raise except JSONResponseError: # pragma: nocover # We trap JSONResponseError because Moto returns text instead of diff --git a/autopush/main.py b/autopush/main.py index 0a207348..9d6df1bf 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -260,7 +260,6 @@ def add_websocket(self): self.clients) site_factory = self.websocket_site_factory(settings, ws_factory) self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf()) - self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics) @classmethod def from_argparse(cls, ns): diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index ae894e57..de455669 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -218,7 +218,6 @@ def test_nuke_connection(self, mock_reactor): self.proto.state = "" self.proto.ps.uaid = uuid.uuid4().hex self.proto.nukeConnection() - ok_(self.proto.metrics.increment.called) @patch("autopush.websocket.reactor") def test_nuke_connection_shutdown_ran(self, mock_reactor): @@ -261,17 +260,6 @@ def test_base_tags(self): 'ua_browser_family:Firefox', 'host:example.com:8080'])) - def test_reporter(self): - self.metrics.reset_mock() - self.factory.periodic_reporter(self.metrics) - - # Verify metric increase of nothing - calls = self.metrics.method_calls - eq_(len(calls), 4) - name, args, _ = calls[0] - eq_(name, "gauge") - eq_(args, ("update.client.writers", 0)) - def test_handshake_sub(self): self.factory.externalPort = 80 @@ -414,7 +402,7 @@ def test_close_with_delivery_cleanup_using_webpush(self): yield self._wait_for(lambda: self.mock_agent.mock_calls) self.flushLoggedErrors() - @inlineCallbacks + #@inlineCallbacks def test_close_with_delivery_cleanup_and_get_no_result(self): self._connect() self.proto.ps.uaid = uuid.uuid4().hex @@ -428,16 +416,12 @@ def test_close_with_delivery_cleanup_and_get_no_result(self): self.proto.db.storage.save_notification = Mock() self.proto.db.router.get_uaid = mock_get = Mock() mock_get.return_value = False - self.metrics.reset_mock() # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.notify_uaid_failure", tags=None) + #yield self._wait_for_close() - @inlineCallbacks + #@inlineCallbacks def test_close_with_delivery_cleanup_and_get_uaid_error(self): self._connect() self.proto.ps.uaid = uuid.uuid4().hex @@ -459,10 +443,7 @@ def raise_item(*args, **kwargs): # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.lookup_uaid_failure", tags=None) + #yield self._wait_for_close() @inlineCallbacks def test_close_with_delivery_cleanup_and_no_node_id(self): diff --git a/autopush/web/registration.py b/autopush/web/registration.py index f13ae4ec..c85b64b5 100644 --- a/autopush/web/registration.py +++ b/autopush/web/registration.py @@ -337,8 +337,7 @@ def post(self, router_type, router_data): Router type/data registration. """ - self.metrics.increment("updates.client.register", - tags=self.base_tags()) + self.metrics.increment("ua.command.register", tags=self.base_tags()) uaid = uuid.uuid4() @@ -455,7 +454,7 @@ class ChannelRegistrationHandler(BaseRegistrationHandler): @threaded_validate(UnregisterChidSchema) def delete(self, uaid, chid): # type: (uuid.UUID, str) -> Deferred - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags()) d = deferToThread(self._delete_channel, uaid, chid) d.addCallback(self._success) diff --git a/autopush/web/simplepush.py b/autopush/web/simplepush.py index 6f5cc469..7dedd51f 100644 --- a/autopush/web/simplepush.py +++ b/autopush/web/simplepush.py @@ -135,7 +135,7 @@ def _router_completed(self, response, uaid_data, warning=""): self.log.debug(format="Router miss, message stored.", client_info=self._client_info) time_diff = time.time() - self._start_time - self.metrics.timing("updates.handled", duration=time_diff) + self.metrics.timing("notification.request_time", duration=time_diff) response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index f97db29a..55533243 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -513,7 +513,7 @@ def _router_completed(self, response, uaid_data, warning=""): self.log.debug( format="Router miss, message stored.", client_info=self._client_info) - self.metrics.timing("updates.handled", duration=time_diff) + self.metrics.timing("notification.request_time", duration=time_diff) response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/websocket.py b/autopush/websocket.py index b8864030..6b9bdfd3 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -230,9 +230,6 @@ def __init__(self, db, request): if host: self._base_tags.append("host:%s" % host) - db.metrics.increment("client.socket.connect", - tags=self._base_tags or None) - self._should_stop = False self._paused = False self.uaid = None @@ -453,13 +450,9 @@ def _sendAutoPing(self): """Override for sanity checking during auto-ping interval""" if not self.ps.uaid: # No uaid yet, drop the connection - self.metrics.increment("client.autoping.no_uaid", - tags=self.base_tags) self.sendClose() elif self.factory.clients.get(self.ps.uaid) != self: # UAID, but we're not in clients anymore for some reason - self.metrics.increment("client.autoping.invalid_client", - tags=self.base_tags) self.sendClose() return WebSocketServerProtocol._sendAutoPing(self) @@ -476,14 +469,7 @@ def nukeConnection(self): still hadn't run by this point""" # Did onClose get called? If so, we shutdown properly, no worries. if hasattr(self, "_shutdown_ran"): - self.metrics.increment("client.success.sendClose", - tags=self.base_tags) return - - # Uh-oh, we have not been shut-down properly, report detailed data - self.metrics.increment("client.error.sendClose_failed", - tags=self.base_tags) - self.transport.abortConnection() @log_exception @@ -605,9 +591,8 @@ def onClose(self, wasClean, code, reason): def cleanUp(self, wasClean, code, reason): """Thorough clean-up method to cancel all remaining deferreds, and send connection metrics in""" - self.metrics.increment("client.socket.disconnect", tags=self.base_tags) elapsed = (ms_time() - self.ps.connected_at) / 1000.0 - self.metrics.timing("client.socket.lifespan", duration=elapsed, + self.metrics.timing("ua.connection.lifespan", duration=elapsed, tags=self.base_tags) self.ps.stats.connection_time = int(elapsed) @@ -678,8 +663,6 @@ def _trap_uaid_not_found(self, fail): # type: (failure.Failure) -> None """Traps UAID not found error""" fail.trap(ItemNotFound) - self.metrics.increment("client.lookup_uaid_failure", - tags=self.base_tags) def _notify_node(self, result): """Checks the result of lookup node to send the notify if the client is @@ -857,8 +840,6 @@ def _verify_user_record(self): uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) self.force_retry(self.db.router.drop_user, self.ps.uaid) - self.metrics.increment("client.drop_user", - tags={"errno": 104}) return None # Validate webpush records @@ -961,7 +942,7 @@ def finish_hello(self, previous): self.sendJSON(msg) self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash, **self.ps.raw_agent) - self.metrics.increment("updates.client.hello", tags=self.base_tags) + self.metrics.increment("ua.command.hello", tags=self.base_tags) self.process_notifications() def process_notifications(self): @@ -1139,7 +1120,7 @@ def finish_webpush_notifications(self, result): if self.sent_notification_count > self.ap_settings.msg_limit: raise MessageOverloadException() if notif.topic: - self.metrics.increment("updates.notification.topic", + self.metrics.increment("notification.topic", tags=self.base_tags) self.sendJSON(msg) @@ -1213,7 +1194,6 @@ def error_monthly_rotation_overload(self, fail): def _send_ping(self): """Helper for ping sending that tracks when the ping was sent""" self.ps.last_ping = time.time() - self.metrics.increment("updates.client.ping", tags=self.base_tags) return self.sendMessage("{}", False) def process_ping(self): @@ -1289,7 +1269,7 @@ def send_register_finish(self, result, endpoint, chid): "status": 200 } self.sendJSON(msg) - self.metrics.increment("updates.client.register", tags=self.base_tags) + self.metrics.increment("ua.command.register", tags=self.base_tags) self.ps.stats.registers += 1 self.log.debug(format="Register", channel_id=chid, endpoint=endpoint, @@ -1307,7 +1287,7 @@ def process_unregister(self, data): except ValueError: return self.bad_message("unregister", "Invalid ChannelID") - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags) self.ps.stats.unregisters += 1 event = dict(format="Unregister", channel_id=chid, @@ -1470,7 +1450,7 @@ def process_ack(self, data): if not updates or not isinstance(updates, list): return - self.metrics.increment("updates.client.ack", tags=self.base_tags) + self.metrics.increment("ua.command.ack", tags=self.base_tags) defers = filter(None, map(self.ack_update, updates)) if defers: @@ -1578,17 +1558,6 @@ def __init__(self, ap_settings, db, agent, clients): closeHandshakeTimeout=ap_settings.close_handshake_timeout, ) - def periodic_reporter(self, metrics): - # type: (IMetrics) -> None - """Twisted Task function that runs every few seconds to emit general - metrics regarding twisted and client counts. - - """ - metrics.gauge("update.client.writers", len(reactor.getWriters())) - metrics.gauge("update.client.readers", len(reactor.getReaders())) - metrics.gauge("update.client.connections", len(self.clients)) - metrics.gauge("update.client.ws_connections", self.countConnections) - class RouterHandler(BaseHandler): """Router Handler @@ -1606,20 +1575,17 @@ def put(self, uaid): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.router.disconnected") self.write("Client not connected.") return if client.paused: self.set_status(503, reason=None) - self.metrics.increment("updates.router.busy") self.write("Client busy.") return update = json.loads(self.request.body) client.send_notification(update) - self.metrics.increment("updates.router.received") self.write("Client accepted for delivery") @@ -1635,7 +1601,6 @@ def put(self, uaid, *args): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.notification.disconnected") self.write("Client not connected.") return @@ -1643,13 +1608,12 @@ def put(self, uaid, *args): # Client already busy waiting for stuff, flag for check client._check_notifications = True self.set_status(202) - self.metrics.increment("updates.notification.flagged") self.write("Flagged for Notification check") return # Client is online and idle, start a notification check client.process_notifications() - self.metrics.increment("updates.notification.checking") + self.metrics.increment("ua.notification_check") self.write("Notification check started") def delete(self, uaid, connected_at):