diff --git a/autopush/db.py b/autopush/db.py index 7060598e..8f330427 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -42,7 +42,7 @@ attrib, Factory ) -from boto.exception import JSONResponseError, BotoServerError +from boto.exception import JSONResponseError from boto.dynamodb2.exceptions import ( ConditionalCheckFailedException, ItemNotFound, @@ -298,17 +298,7 @@ def track_provisioned(func): def wrapper(self, *args, **kwargs): if TRACK_DB_CALLS: DB_CALLS.append(func.__name__) - try: - return func(self, *args, **kwargs) - except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.%s" % func.__name__) - raise - except JSONResponseError: - self.metrics.increment("error.jsonresponse.%s" % func.__name__) - raise - except BotoServerError: - self.metrics.increment("error.botoserver.%s" % func.__name__) - raise + return func(self, *args, **kwargs) return wrapper @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None): chid=normalize_id(chid)) return True except ProvisionedThroughputExceededException: - self.metrics.increment("error.provisioned.delete_notification") return False @@ -678,7 +667,6 @@ def get_uaid(self, uaid): # We unfortunately have to catch this here, as track_provisioned # will not see this, since JSONResponseError is a subclass and # will capture it - self.metrics.increment("error.provisioned.get_uaid") raise except JSONResponseError: # pragma: nocover # We trap JSONResponseError because Moto returns text instead of diff --git a/autopush/main.py b/autopush/main.py index fd2df944..e3bb82a3 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -260,7 +260,6 @@ def add_websocket(self): self.clients) site_factory = self.websocket_site_factory(settings, ws_factory) self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf()) - self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics) @classmethod def from_argparse(cls, ns): diff --git a/autopush/metrics.py b/autopush/metrics.py index 8ff0ccbb..60318c17 100644 --- a/autopush/metrics.py +++ b/autopush/metrics.py @@ -89,19 +89,34 @@ def __init__(self, api_key, app_key, hostname, flush_interval=10, def _prefix_name(self, name): return "%s.%s" % (self._namespace, name) + def _normalize_tags(self, tags): + if tags is None: + return None + if isinstance(tags, list): + return tags + if isinstance(tags, dict): + return ["{}:{}".format(k, tags[k]) for k in tags] + raise TypeError("Could not normalize tags") + def start(self): self._client.start(flush_interval=self._flush_interval, roll_up_interval=self._flush_interval) def increment(self, name, count=1, **kwargs): + if 'tags' in kwargs: + kwargs['tags'] = self._normalize_tags(kwargs['tags']) self._client.increment(self._prefix_name(name), count, host=self._host, **kwargs) def gauge(self, name, count, **kwargs): + if 'tags' in kwargs: + kwargs['tags'] = self._normalize_tags(kwargs['tags']) self._client.gauge(self._prefix_name(name), count, host=self._host, **kwargs) def timing(self, name, duration, **kwargs): + if 'tags' in kwargs: + kwargs['tags'] = self._normalize_tags(kwargs['tags']) self._client.timing(self._prefix_name(name), value=duration, host=self._host, **kwargs) diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index ee188cd1..cc0d89b6 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -170,6 +170,9 @@ def _route(self, notification, router_data): router_data["rel_channel"], self._base_tags ) + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Direct'}) return RouterResponse(status_code=201, response_body="", headers={"TTL": notification.ttl, "Location": location}, diff --git a/autopush/router/fcm.py b/autopush/router/fcm.py index 12bb02a8..5fe1c04b 100644 --- a/autopush/router/fcm.py +++ b/autopush/router/fcm.py @@ -265,6 +265,9 @@ def _process_reply(self, reply, notification, router_data, ttl): ) self.metrics.increment("updates.client.bridge.fcm.succeeded", self._base_tags) + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Direct'}) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 02593989..161915d9 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -189,6 +189,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification): self.metrics.increment("updates.client.bridge.gcm.succeeded", self._base_tags) + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Direct'}) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.version) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/router/simple.py b/autopush/router/simple.py index 34314d00..083a42a9 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -61,9 +61,15 @@ def amend_endpoint_response(self, response, router_data): """Stubbed out for this router""" def stored_response(self, notification): + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Stored'}) return RouterResponse(202, "Notification Stored") def delivered_response(self, notification): + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Direct'}) return RouterResponse(200, "Delivered") @inlineCallbacks diff --git a/autopush/router/webpush.py b/autopush/router/webpush.py index c4c26bc7..aef9ef81 100644 --- a/autopush/router/webpush.py +++ b/autopush/router/webpush.py @@ -24,6 +24,9 @@ class WebPushRouter(SimpleRouter): """SimpleRouter subclass to store individual messages appropriately""" def delivered_response(self, notification): + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Stored'}) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", @@ -32,6 +35,9 @@ def delivered_response(self, notification): logged_status=200) def stored_response(self, notification): + self.metrics.gauge("notification.message_data", + len(notification.data or ""), + tags={'destination': 'Direct'}) location = "%s/m/%s" % (self.ap_settings.endpoint_url, notification.location) return RouterResponse(status_code=201, response_body="", diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index fd0fe897..c2aa9004 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -13,7 +13,7 @@ from distutils.spawn import find_executable from StringIO import StringIO from httplib import HTTPResponse # noqa -from mock import Mock, call +from mock import Mock, call, patch from unittest.case import SkipTest from zope.interface import implementer @@ -47,7 +47,7 @@ from autopush.main import ConnectionApplication, EndpointApplication from autopush.settings import AutopushSettings from autopush.utils import base64url_encode -from autopush.metrics import SinkMetrics +from autopush.metrics import SinkMetrics, DatadogMetrics from autopush.tests.support import TestingLogObserver from autopush.websocket import PushServerFactory @@ -675,8 +675,9 @@ def test_webpush_data_delivery_to_connected_client(self): )) yield self.shut_down(client) + @patch("autopush.metrics.datadog") @inlineCallbacks - def test_webpush_data_delivery_to_disconnected_client(self): + def test_webpush_data_delivery_to_disconnected_client(self, m_ddog): tests = { "d248d4e0-0ef4-41d9-8db5-2533ad8e4041": dict( data=b"\xe2\x82\x28\xf0\x28\x8c\xbc", result="4oIo8CiMvA"), @@ -688,6 +689,11 @@ def test_webpush_data_delivery_to_disconnected_client(self): "6c33e055-5762-47e5-b90c-90ad9bfe3f53": dict( data=b"\xc3\x28\xa0\xa1\xe2\x28\xa1", result="wyigoeIooQ"), } + # Piggy back a check for stored source metrics + self.conn.db.metrics = DatadogMetrics( + "someapikey", "someappkey", namespace="testpush", + hostname="localhost") + self.conn.db.metrics._client = Mock() client = Client("ws://localhost:9010/", use_webpush=True) yield client.connect() @@ -716,6 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self): ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci), "message_size not logged") + eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'], + ['source:Stored']) yield self.shut_down(client) @inlineCallbacks diff --git a/autopush/tests/test_metrics.py b/autopush/tests/test_metrics.py index 8e10d704..7d41e2cb 100644 --- a/autopush/tests/test_metrics.py +++ b/autopush/tests/test_metrics.py @@ -68,3 +68,25 @@ def test_basic(self, mock_dog): m.timing("lifespan", 113) m._client.timing.assert_called_with("testpush.lifespan", value=113, host=hostname) + + @patch("autopush.metrics.datadog") + def test_normalize_dicts(self, mock_dog): + m = DatadogMetrics("someapikey", "someappkey", namespace="testpush", + hostname="localhost") + ok_(len(mock_dog.mock_calls) > 0) + m._client = Mock() + m.start() + + m.increment("dict", 1, tags={"a": 1, "b": 2}) + m._client.increment.call_args[1]['tags'].sort() + eq_(m._client.increment.call_args[1]['tags'], ['a:1', 'b:2']) + m._client.increment.reset_mock() + m.gauge("list", 1, tags=["a:1", "b:2"]) + m._client.gauge.call_args[1]['tags'].sort() + eq_(m._client.gauge.call_args[1]['tags'], ['a:1', 'b:2']) + m._client.gauge.reset_mock() + m.increment("none", 1) + m.timing("alsonone", 1, tags=None) + self.assertRaises(TypeError, + m.increment, + "bad", 1, tags=1) diff --git a/autopush/tests/test_web_base.py b/autopush/tests/test_web_base.py index f9a518fd..e29f5ac1 100644 --- a/autopush/tests/test_web_base.py +++ b/autopush/tests/test_web_base.py @@ -17,6 +17,8 @@ from autopush.http import EndpointHTTPFactory from autopush.exceptions import InvalidRequest from autopush.settings import AutopushSettings +from autopush.metrics import SinkMetrics +from autopush.tests.support import test_db dummy_request_id = "11111111-1234-1234-1234-567812345678" dummy_uaid = str(uuid.UUID("abad1dea00000000aabbccdd00000000")) @@ -58,7 +60,8 @@ def setUp(self): host='example.com:8080') self.base = BaseWebHandler( - EndpointHTTPFactory(settings, db=None, routers=None), + EndpointHTTPFactory(settings, db=test_db(SinkMetrics()), + routers=None), self.request_mock ) self.status_mock = self.base.set_status = Mock() diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index 388e0abc..6f8a6f8a 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -126,6 +126,8 @@ def setUp(self): self.proto._log_exc = False self.proto.log = Mock(spec=Logger) + self.proto.debug = True + self.proto.sendMessage = self.send_mock = Mock() self.orig_close = self.proto.sendClose request_mock = Mock(spec=ConnectionRequest) @@ -222,7 +224,6 @@ def test_nuke_connection(self, mock_reactor): self.proto.state = "" self.proto.ps.uaid = uuid.uuid4().hex self.proto.nukeConnection() - ok_(self.proto.metrics.increment.called) @patch("autopush.websocket.reactor") def test_nuke_connection_shutdown_ran(self, mock_reactor): @@ -264,17 +265,6 @@ def test_base_tags(self): 'ua_browser_family:Firefox', 'host:example.com:8080'])) - def test_reporter(self): - self.metrics.reset_mock() - self.factory.periodic_reporter(self.metrics) - - # Verify metric increase of nothing - calls = self.metrics.method_calls - eq_(len(calls), 4) - name, args, _ = calls[0] - eq_(name, "gauge") - eq_(args, ("update.client.writers", 0)) - def test_handshake_sub(self): self.factory.externalPort = 80 @@ -435,10 +425,10 @@ def test_close_with_delivery_cleanup_and_get_no_result(self): # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.notify_uaid_failure", tags=None) + yield self._wait_for(lambda: len(self.metrics.mock_calls) > 0) + eq_(self.metrics.timing.call_args[0][0], 'ua.connection.lifespan') + # Wait for final cleanup (no error or metric produced) + yield sleep(1) @inlineCallbacks def test_close_with_delivery_cleanup_and_get_uaid_error(self): @@ -462,10 +452,9 @@ def raise_item(*args, **kwargs): # Close the connection self.proto.onClose(True, None, None) - yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2) - eq_(len(self.metrics.mock_calls), 3) - self.metrics.increment.assert_called_with( - "client.lookup_uaid_failure", tags=None) + yield self._wait_for(lambda: self.proto.log.info.called) + # Wait for final cleanup (no error or metric produced) + yield sleep(1) @inlineCallbacks def test_close_with_delivery_cleanup_and_no_node_id(self): @@ -1766,6 +1755,7 @@ def test_notif_finished_with_too_many_messages(self): d = Deferred() def check(*args, **kwargs): + eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"]) ok_(self.proto.force_retry.called) ok_(self.send_mock.called) d.callback(True) diff --git a/autopush/utils.py b/autopush/utils.py index 99d8cbca..a2e9d517 100644 --- a/autopush/utils.py +++ b/autopush/utils.py @@ -270,6 +270,7 @@ class WebPushNotification(object): timestamp = attrib(default=Factory(lambda: int(time.time()))) # type: int sortkey_timestamp = attrib(default=None) # type: Optional[int] topic = attrib(default=None) # type: Optional[str] + source = attrib(default="Direct") # type: Optional[str] message_id = attrib(default=None) # type: str @@ -460,7 +461,8 @@ def from_message_table(cls, uaid, item): message_id=key_info["message_id"], update_id=item.get("updateid"), timestamp=item.get("timestamp"), - sortkey_timestamp=key_info.get("sortkey_timestamp") + sortkey_timestamp=key_info.get("sortkey_timestamp"), + source="Stored" ) # Ensure we generate the sort-key properly for legacy messges diff --git a/autopush/web/base.py b/autopush/web/base.py index cce2425d..cc01e11c 100644 --- a/autopush/web/base.py +++ b/autopush/web/base.py @@ -267,10 +267,11 @@ def _router_response(self, response): errno=response.errno or 999, message=response.response_body) - def _router_fail_err(self, fail): + def _router_fail_err(self, fail, router_type=None, vapid=None): """errBack for router failures""" fail.trap(RouterException) exc = fail.value + success = False if exc.log_exception: if exc.status_code >= 500: fmt = fail.value.message or 'Exception' @@ -283,12 +284,26 @@ def _router_fail_err(self, fail): self.log.debug(format="Success", status_code=exc.status_code, logged_status=exc.logged_status or 0, client_info=self._client_info) + success = True + self.metrics.increment('notification.message.success', + tags=['destination:"Direct"', + 'router:{}'.format(router_type), + 'vapid:{}'.format(vapid is not + None)] + ) elif 400 <= exc.status_code < 500: self.log.debug(format="Client error", status_code=exc.status_code, logged_status=exc.logged_status or 0, errno=exc.errno or 0, client_info=self._client_info) + if not success: + self.metrics.increment('notification.message.error', + tags=dict( + code=exc.status_code, + router=router_type, + vapid=(vapid is True) + )) self._router_response(exc) def _write_validation_err(self, errors): diff --git a/autopush/web/registration.py b/autopush/web/registration.py index f13ae4ec..c85b64b5 100644 --- a/autopush/web/registration.py +++ b/autopush/web/registration.py @@ -337,8 +337,7 @@ def post(self, router_type, router_data): Router type/data registration. """ - self.metrics.increment("updates.client.register", - tags=self.base_tags()) + self.metrics.increment("ua.command.register", tags=self.base_tags()) uaid = uuid.uuid4() @@ -455,7 +454,7 @@ class ChannelRegistrationHandler(BaseRegistrationHandler): @threaded_validate(UnregisterChidSchema) def delete(self, uaid, chid): # type: (uuid.UUID, str) -> Deferred - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags()) d = deferToThread(self._delete_channel, uaid, chid) d.addCallback(self._success) diff --git a/autopush/web/simplepush.py b/autopush/web/simplepush.py index 6f5cc469..7dedd51f 100644 --- a/autopush/web/simplepush.py +++ b/autopush/web/simplepush.py @@ -135,7 +135,7 @@ def _router_completed(self, response, uaid_data, warning=""): self.log.debug(format="Router miss, message stored.", client_info=self._client_info) time_diff = time.time() - self._start_time - self.metrics.timing("updates.handled", duration=time_diff) + self.metrics.timing("notification.request_time", duration=time_diff) response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index f97db29a..aa9edfc2 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -466,15 +466,21 @@ def post(self, version=notification.version, encoding=encoding, ) - router = self.routers[user_data["router_type"]] + router_type = user_data["router_type"] + router = self.routers[router_type] self._router_time = time.time() d = maybeDeferred(router.route_notification, notification, user_data) - d.addCallback(self._router_completed, user_data, "") - d.addErrback(self._router_fail_err) + d.addCallback(self._router_completed, user_data, "", + router_type=router_type, + vapid=jwt) + d.addErrback(self._router_fail_err, + router_type=router_type, + vapid=jwt) d.addErrback(self._response_err) return d - def _router_completed(self, response, uaid_data, warning=""): + def _router_completed(self, response, uaid_data, warning="", + router_type=None, vapid=None): """Called after router has completed successfully""" # Log the time taken for routing self._timings["route_time"] = time.time() - self._router_time @@ -502,10 +508,13 @@ def _router_completed(self, response, uaid_data, warning=""): d.addCallback(lambda x: self._router_completed( response, uaid_data, - warning)) + warning, + router_type, + vapid)) return d else: # No changes are requested by the bridge system, proceed as normal + dest = 'Direct' if response.status_code == 200 or response.logged_status == 200: self.log.debug(format="Successful delivery", client_info=self._client_info) @@ -513,7 +522,16 @@ def _router_completed(self, response, uaid_data, warning=""): self.log.debug( format="Router miss, message stored.", client_info=self._client_info) - self.metrics.timing("updates.handled", duration=time_diff) + dest = 'Stored' + self.metrics.timing("notification.request_time", + duration=time_diff) + self.metrics.increment('notification.message.success', + tags=dict( + destination=dest, + router=router_type, + vapid=(vapid is True)) + ) + response.response_body = ( response.response_body + " " + warning).strip() self._router_response(response) diff --git a/autopush/websocket.py b/autopush/websocket.py index 737f0704..32aa8e04 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -244,9 +244,6 @@ def __attrs_post_init__(self): if self.stats.host: self._base_tags.append("host:%s" % self.stats.host) - self.db.metrics.increment("client.socket.connect", - tags=self._base_tags or None) - # Message table rotation initial settings self.message_month = self.db.current_msg_month @@ -434,13 +431,9 @@ def _sendAutoPing(self): """Override for sanity checking during auto-ping interval""" if not self.ps.uaid: # No uaid yet, drop the connection - self.metrics.increment("client.autoping.no_uaid", - tags=self.base_tags) self.sendClose() elif self.factory.clients.get(self.ps.uaid) != self: # UAID, but we're not in clients anymore for some reason - self.metrics.increment("client.autoping.invalid_client", - tags=self.base_tags) self.sendClose() return WebSocketServerProtocol._sendAutoPing(self) @@ -457,14 +450,7 @@ def nukeConnection(self): still hadn't run by this point""" # Did onClose get called? If so, we shutdown properly, no worries. if hasattr(self, "_shutdown_ran"): - self.metrics.increment("client.success.sendClose", - tags=self.base_tags) return - - # Uh-oh, we have not been shut-down properly, report detailed data - self.metrics.increment("client.error.sendClose_failed", - tags=self.base_tags) - self.transport.abortConnection() @log_exception @@ -586,9 +572,8 @@ def onClose(self, wasClean, code, reason): def cleanUp(self, wasClean, code, reason): """Thorough clean-up method to cancel all remaining deferreds, and send connection metrics in""" - self.metrics.increment("client.socket.disconnect", tags=self.base_tags) elapsed = (ms_time() - self.ps.connected_at) / 1000.0 - self.metrics.timing("client.socket.lifespan", duration=elapsed, + self.metrics.timing("ua.connection.lifespan", duration=elapsed, tags=self.base_tags) self.ps.stats.connection_time = int(elapsed) @@ -659,8 +644,6 @@ def _trap_uaid_not_found(self, fail): # type: (failure.Failure) -> None """Traps UAID not found error""" fail.trap(ItemNotFound) - self.metrics.increment("client.lookup_uaid_failure", - tags=self.base_tags) def _notify_node(self, result): """Checks the result of lookup node to send the notify if the client is @@ -837,9 +820,10 @@ def _verify_user_record(self): self.log.debug(format="Dropping User", code=104, uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) + tags = ['code:104'] + tags.extend(self.base_tags or []) + self.metrics.increment("ua.expiration", tags=tags) self.force_retry(self.db.router.drop_user, self.ps.uaid) - self.metrics.increment("client.drop_user", - tags={"errno": 104}) return None # Validate webpush records @@ -852,8 +836,9 @@ def _verify_user_record(self): uaid_record=dump_uaid(record)) self.force_retry(self.db.router.drop_user, self.ps.uaid) - self.metrics.increment("client.drop_user", - tags={"errno": 105}) + tags = list('code:105') + tags.extend(self.base_tags or []) + self.metrics.increment("ua.expiration", tags=tags) return None # Determine if message table rotation is needed @@ -942,7 +927,7 @@ def finish_hello(self, previous): self.sendJSON(msg) self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash, **self.ps.raw_agent) - self.metrics.increment("updates.client.hello", tags=self.base_tags) + self.metrics.increment("ua.command.hello", tags=self.base_tags) self.process_notifications() def process_notifications(self): @@ -1120,8 +1105,10 @@ def finish_webpush_notifications(self, result): if self.sent_notification_count > self.ap_settings.msg_limit: raise MessageOverloadException() if notif.topic: - self.metrics.increment("updates.notification.topic", + self.metrics.increment("notification.topic", tags=self.base_tags) + self.metrics.gauge('ua.message_data', len(msg.get('data', '')), + tags=["source:{}".format(notif.source)]) self.sendJSON(msg) # Did we send any messages? @@ -1194,7 +1181,6 @@ def error_monthly_rotation_overload(self, fail): def _send_ping(self): """Helper for ping sending that tracks when the ping was sent""" self.ps.last_ping = time.time() - self.metrics.increment("updates.client.ping", tags=self.base_tags) return self.sendMessage("{}", False) def process_ping(self): @@ -1270,7 +1256,7 @@ def send_register_finish(self, result, endpoint, chid): "status": 200 } self.sendJSON(msg) - self.metrics.increment("updates.client.register", tags=self.base_tags) + self.metrics.increment("ua.command.register", tags=self.base_tags) self.ps.stats.registers += 1 self.log.debug(format="Register", channel_id=chid, endpoint=endpoint, @@ -1288,7 +1274,7 @@ def process_unregister(self, data): except ValueError: return self.bad_message("unregister", "Invalid ChannelID") - self.metrics.increment("updates.client.unregister", + self.metrics.increment("ua.command.unregister", tags=self.base_tags) self.ps.stats.unregisters += 1 event = dict(format="Unregister", channel_id=chid, @@ -1451,7 +1437,7 @@ def process_ack(self, data): if not updates or not isinstance(updates, list): return - self.metrics.increment("updates.client.ack", tags=self.base_tags) + self.metrics.increment("ua.command.ack", tags=self.base_tags) defers = filter(None, map(self.ack_update, updates)) if defers: @@ -1471,6 +1457,9 @@ def process_nack(self, data): self.log.debug(format="Nack", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, message_id=str(version), code=code, **self.ps.raw_agent) + tags = ["code:401"] + tags.extend(self.base_tags or []) + self.metrics.increment('ua.command.nack', tags=tags) self.ps.stats.nacks += 1 def check_missed_notifications(self, results, resume=False): @@ -1559,17 +1548,6 @@ def __init__(self, ap_settings, db, agent, clients): closeHandshakeTimeout=ap_settings.close_handshake_timeout, ) - def periodic_reporter(self, metrics): - # type: (IMetrics) -> None - """Twisted Task function that runs every few seconds to emit general - metrics regarding twisted and client counts. - - """ - metrics.gauge("update.client.writers", len(reactor.getWriters())) - metrics.gauge("update.client.readers", len(reactor.getReaders())) - metrics.gauge("update.client.connections", len(self.clients)) - metrics.gauge("update.client.ws_connections", self.countConnections) - class RouterHandler(BaseHandler): """Router Handler @@ -1587,20 +1565,17 @@ def put(self, uaid): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.router.disconnected") self.write("Client not connected.") return if client.paused: self.set_status(503, reason=None) - self.metrics.increment("updates.router.busy") self.write("Client busy.") return update = json.loads(self.request.body) client.send_notification(update) - self.metrics.increment("updates.router.received") self.write("Client accepted for delivery") @@ -1616,7 +1591,6 @@ def put(self, uaid, *args): client = self.application.clients.get(uaid) if not client: self.set_status(404, reason=None) - self.metrics.increment("updates.notification.disconnected") self.write("Client not connected.") return @@ -1624,13 +1598,12 @@ def put(self, uaid, *args): # Client already busy waiting for stuff, flag for check client._check_notifications = True self.set_status(202) - self.metrics.increment("updates.notification.flagged") self.write("Flagged for Notification check") return # Client is online and idle, start a notification check client.process_notifications() - self.metrics.increment("updates.notification.checking") + self.metrics.increment("ua.notification_check") self.write("Notification check started") def delete(self, uaid, connected_at):