Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Use modern metrics
Browse files Browse the repository at this point in the history
Closes #943
  • Loading branch information
jrconlin committed Jul 6, 2017
1 parent 40f15e0 commit 46c3489
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 96 deletions.
16 changes: 2 additions & 14 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
attrib,
Factory
)
from boto.exception import JSONResponseError, BotoServerError
from boto.exception import JSONResponseError
from boto.dynamodb2.exceptions import (
ConditionalCheckFailedException,
ItemNotFound,
Expand Down Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
15 changes: 15 additions & 0 deletions autopush/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,34 @@ def __init__(self, api_key, app_key, hostname, flush_interval=10,
def _prefix_name(self, name):
return "%s.%s" % (self._namespace, name)

def _normalize_tags(self, tags):
if tags is None:
return None
if isinstance(tags, list):
return tags
if isinstance(tags, dict):
return ["{}:{}".format(k, tags[k]) for k in tags]
raise TypeError("Could not normalize tags")

def start(self):
self._client.start(flush_interval=self._flush_interval,
roll_up_interval=self._flush_interval)

def increment(self, name, count=1, **kwargs):
if 'tags' in kwargs:
kwargs['tags'] = self._normalize_tags(kwargs['tags'])
self._client.increment(self._prefix_name(name), count, host=self._host,
**kwargs)

def gauge(self, name, count, **kwargs):
if 'tags' in kwargs:
kwargs['tags'] = self._normalize_tags(kwargs['tags'])
self._client.gauge(self._prefix_name(name), count, host=self._host,
**kwargs)

def timing(self, name, duration, **kwargs):
if 'tags' in kwargs:
kwargs['tags'] = self._normalize_tags(kwargs['tags'])
self._client.timing(self._prefix_name(name), value=duration,
host=self._host, **kwargs)

Expand Down
3 changes: 3 additions & 0 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ def _route(self, notification, router_data):
router_data["rel_channel"],
self._base_tags
)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
3 changes: 3 additions & 0 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def _process_reply(self, reply, notification, router_data, ttl):
)
self.metrics.increment("updates.client.bridge.fcm.succeeded",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
3 changes: 3 additions & 0 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def _process_reply(self, reply, uaid_data, ttl, notification):

self.metrics.increment("updates.client.bridge.gcm.succeeded",
self._base_tags)
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
6 changes: 6 additions & 0 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ def amend_endpoint_response(self, response, router_data):
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Stored'})
return RouterResponse(202, "Notification Stored")

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
return RouterResponse(200, "Delivered")

@inlineCallbacks
Expand Down
6 changes: 6 additions & 0 deletions autopush/router/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class WebPushRouter(SimpleRouter):
"""SimpleRouter subclass to store individual messages appropriately"""

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Stored'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand All @@ -32,6 +35,9 @@ def delivered_response(self, notification):
logged_status=200)

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
len(notification.data or ""),
tags={'destination': 'Direct'})
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand Down
14 changes: 11 additions & 3 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from distutils.spawn import find_executable
from StringIO import StringIO
from httplib import HTTPResponse # noqa
from mock import Mock, call
from mock import Mock, call, patch
from unittest.case import SkipTest

from zope.interface import implementer
Expand Down Expand Up @@ -47,7 +47,7 @@
from autopush.main import ConnectionApplication, EndpointApplication
from autopush.settings import AutopushSettings
from autopush.utils import base64url_encode
from autopush.metrics import SinkMetrics
from autopush.metrics import SinkMetrics, DatadogMetrics
from autopush.tests.support import TestingLogObserver
from autopush.websocket import PushServerFactory

Expand Down Expand Up @@ -675,8 +675,9 @@ def test_webpush_data_delivery_to_connected_client(self):
))
yield self.shut_down(client)

@patch("autopush.metrics.datadog")
@inlineCallbacks
def test_webpush_data_delivery_to_disconnected_client(self):
def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):
tests = {
"d248d4e0-0ef4-41d9-8db5-2533ad8e4041": dict(
data=b"\xe2\x82\x28\xf0\x28\x8c\xbc", result="4oIo8CiMvA"),
Expand All @@ -688,6 +689,11 @@ def test_webpush_data_delivery_to_disconnected_client(self):
"6c33e055-5762-47e5-b90c-90ad9bfe3f53": dict(
data=b"\xc3\x28\xa0\xa1\xe2\x28\xa1", result="wyigoeIooQ"),
}
# Piggy back a check for stored source metrics
self.conn.db.metrics = DatadogMetrics(
"someapikey", "someappkey", namespace="testpush",
hostname="localhost")
self.conn.db.metrics._client = Mock()

client = Client("ws://localhost:9010/", use_webpush=True)
yield client.connect()
Expand Down Expand Up @@ -716,6 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self):

ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci),
"message_size not logged")
eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'],
['source:Stored'])
yield self.shut_down(client)

@inlineCallbacks
Expand Down
22 changes: 22 additions & 0 deletions autopush/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,25 @@ def test_basic(self, mock_dog):
m.timing("lifespan", 113)
m._client.timing.assert_called_with("testpush.lifespan", value=113,
host=hostname)

@patch("autopush.metrics.datadog")
def test_normalize_dicts(self, mock_dog):
m = DatadogMetrics("someapikey", "someappkey", namespace="testpush",
hostname="localhost")
ok_(len(mock_dog.mock_calls) > 0)
m._client = Mock()
m.start()

m.increment("dict", 1, tags={"a": 1, "b": 2})
m._client.increment.call_args[1]['tags'].sort()
eq_(m._client.increment.call_args[1]['tags'], ['a:1', 'b:2'])
m._client.increment.reset_mock()
m.gauge("list", 1, tags=["a:1", "b:2"])
m._client.gauge.call_args[1]['tags'].sort()
eq_(m._client.gauge.call_args[1]['tags'], ['a:1', 'b:2'])
m._client.gauge.reset_mock()
m.increment("none", 1)
m.timing("alsonone", 1, tags=None)
self.assertRaises(TypeError,
m.increment,
"bad", 1, tags=1)
5 changes: 4 additions & 1 deletion autopush/tests/test_web_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from autopush.http import EndpointHTTPFactory
from autopush.exceptions import InvalidRequest
from autopush.settings import AutopushSettings
from autopush.metrics import SinkMetrics
from autopush.tests.support import test_db

dummy_request_id = "11111111-1234-1234-1234-567812345678"
dummy_uaid = str(uuid.UUID("abad1dea00000000aabbccdd00000000"))
Expand Down Expand Up @@ -58,7 +60,8 @@ def setUp(self):
host='example.com:8080')

self.base = BaseWebHandler(
EndpointHTTPFactory(settings, db=None, routers=None),
EndpointHTTPFactory(settings, db=test_db(SinkMetrics()),
routers=None),
self.request_mock
)
self.status_mock = self.base.set_status = Mock()
Expand Down
30 changes: 10 additions & 20 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def setUp(self):
self.proto._log_exc = False
self.proto.log = Mock(spec=Logger)

self.proto.debug = True

self.proto.sendMessage = self.send_mock = Mock()
self.orig_close = self.proto.sendClose
request_mock = Mock(spec=ConnectionRequest)
Expand Down Expand Up @@ -222,7 +224,6 @@ def test_nuke_connection(self, mock_reactor):
self.proto.state = ""
self.proto.ps.uaid = uuid.uuid4().hex
self.proto.nukeConnection()
ok_(self.proto.metrics.increment.called)

@patch("autopush.websocket.reactor")
def test_nuke_connection_shutdown_ran(self, mock_reactor):
Expand Down Expand Up @@ -264,17 +265,6 @@ def test_base_tags(self):
'ua_browser_family:Firefox',
'host:example.com:8080']))

def test_reporter(self):
self.metrics.reset_mock()
self.factory.periodic_reporter(self.metrics)

# Verify metric increase of nothing
calls = self.metrics.method_calls
eq_(len(calls), 4)
name, args, _ = calls[0]
eq_(name, "gauge")
eq_(args, ("update.client.writers", 0))

def test_handshake_sub(self):
self.factory.externalPort = 80

Expand Down Expand Up @@ -435,10 +425,10 @@ def test_close_with_delivery_cleanup_and_get_no_result(self):

# Close the connection
self.proto.onClose(True, None, None)
yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2)
eq_(len(self.metrics.mock_calls), 3)
self.metrics.increment.assert_called_with(
"client.notify_uaid_failure", tags=None)
yield self._wait_for(lambda: len(self.metrics.mock_calls) > 0)
eq_(self.metrics.timing.call_args[0][0], 'ua.connection.lifespan')
# Wait for final cleanup (no error or metric produced)
yield sleep(1)

@inlineCallbacks
def test_close_with_delivery_cleanup_and_get_uaid_error(self):
Expand All @@ -462,10 +452,9 @@ def raise_item(*args, **kwargs):

# Close the connection
self.proto.onClose(True, None, None)
yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2)
eq_(len(self.metrics.mock_calls), 3)
self.metrics.increment.assert_called_with(
"client.lookup_uaid_failure", tags=None)
yield self._wait_for(lambda: self.proto.log.info.called)
# Wait for final cleanup (no error or metric produced)
yield sleep(1)

@inlineCallbacks
def test_close_with_delivery_cleanup_and_no_node_id(self):
Expand Down Expand Up @@ -1766,6 +1755,7 @@ def test_notif_finished_with_too_many_messages(self):
d = Deferred()

def check(*args, **kwargs):
eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"])
ok_(self.proto.force_retry.called)
ok_(self.send_mock.called)
d.callback(True)
Expand Down
4 changes: 3 additions & 1 deletion autopush/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ class WebPushNotification(object):
timestamp = attrib(default=Factory(lambda: int(time.time()))) # type: int
sortkey_timestamp = attrib(default=None) # type: Optional[int]
topic = attrib(default=None) # type: Optional[str]
source = attrib(default="Direct") # type: Optional[str]

message_id = attrib(default=None) # type: str

Expand Down Expand Up @@ -460,7 +461,8 @@ def from_message_table(cls, uaid, item):
message_id=key_info["message_id"],
update_id=item.get("updateid"),
timestamp=item.get("timestamp"),
sortkey_timestamp=key_info.get("sortkey_timestamp")
sortkey_timestamp=key_info.get("sortkey_timestamp"),
source="Stored"
)

# Ensure we generate the sort-key properly for legacy messges
Expand Down
17 changes: 16 additions & 1 deletion autopush/web/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,11 @@ def _router_response(self, response):
errno=response.errno or 999,
message=response.response_body)

def _router_fail_err(self, fail):
def _router_fail_err(self, fail, router_type=None, vapid=None):
"""errBack for router failures"""
fail.trap(RouterException)
exc = fail.value
success = False
if exc.log_exception:
if exc.status_code >= 500:
fmt = fail.value.message or 'Exception'
Expand All @@ -283,12 +284,26 @@ def _router_fail_err(self, fail):
self.log.debug(format="Success", status_code=exc.status_code,
logged_status=exc.logged_status or 0,
client_info=self._client_info)
success = True
self.metrics.increment('notification.message.success',
tags=['destination:"Direct"',
'router:{}'.format(router_type),
'vapid:{}'.format(vapid is not
None)]
)
elif 400 <= exc.status_code < 500:
self.log.debug(format="Client error",
status_code=exc.status_code,
logged_status=exc.logged_status or 0,
errno=exc.errno or 0,
client_info=self._client_info)
if not success:
self.metrics.increment('notification.message.error',
tags=dict(
code=exc.status_code,
router=router_type,
vapid=(vapid is True)
))
self._router_response(exc)

def _write_validation_err(self, errors):
Expand Down
5 changes: 2 additions & 3 deletions autopush/web/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ def post(self, router_type, router_data):
Router type/data registration.
"""
self.metrics.increment("updates.client.register",
tags=self.base_tags())
self.metrics.increment("ua.command.register", tags=self.base_tags())

uaid = uuid.uuid4()

Expand Down Expand Up @@ -455,7 +454,7 @@ class ChannelRegistrationHandler(BaseRegistrationHandler):
@threaded_validate(UnregisterChidSchema)
def delete(self, uaid, chid):
# type: (uuid.UUID, str) -> Deferred
self.metrics.increment("updates.client.unregister",
self.metrics.increment("ua.command.unregister",
tags=self.base_tags())
d = deferToThread(self._delete_channel, uaid, chid)
d.addCallback(self._success)
Expand Down
Loading

0 comments on commit 46c3489

Please sign in to comment.