Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
WIP: Add modern metrics
Browse files Browse the repository at this point in the history
Closes #943
  • Loading branch information
jrconlin committed Jun 30, 2017
1 parent 2594f9e commit 1e331fe
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 85 deletions.
14 changes: 1 addition & 13 deletions autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,7 @@ def track_provisioned(func):
def wrapper(self, *args, **kwargs):
if TRACK_DB_CALLS:
DB_CALLS.append(func.__name__)
try:
return func(self, *args, **kwargs)
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.%s" % func.__name__)
raise
except JSONResponseError:
self.metrics.increment("error.jsonresponse.%s" % func.__name__)
raise
except BotoServerError:
self.metrics.increment("error.botoserver.%s" % func.__name__)
raise
return func(self, *args, **kwargs)
return wrapper


Expand Down Expand Up @@ -433,7 +423,6 @@ def delete_notification(self, uaid, chid, version=None):
chid=normalize_id(chid))
return True
except ProvisionedThroughputExceededException:
self.metrics.increment("error.provisioned.delete_notification")
return False


Expand Down Expand Up @@ -678,7 +667,6 @@ def get_uaid(self, uaid):
# We unfortunately have to catch this here, as track_provisioned
# will not see this, since JSONResponseError is a subclass and
# will capture it
self.metrics.increment("error.provisioned.get_uaid")
raise
except JSONResponseError: # pragma: nocover
# We trap JSONResponseError because Moto returns text instead of
Expand Down
1 change: 0 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ def add_websocket(self):
self.clients)
site_factory = self.websocket_site_factory(settings, ws_factory)
self.add_maybe_ssl(settings.port, site_factory, site_factory.ssl_cf())
self.add_timer(1.0, ws_factory.periodic_reporter, self.db.metrics)

@classmethod
def from_argparse(cls, ns):
Expand Down
27 changes: 4 additions & 23 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def test_nuke_connection(self, mock_reactor):
self.proto.state = ""
self.proto.ps.uaid = uuid.uuid4().hex
self.proto.nukeConnection()
ok_(self.proto.metrics.increment.called)

@patch("autopush.websocket.reactor")
def test_nuke_connection_shutdown_ran(self, mock_reactor):
Expand Down Expand Up @@ -261,17 +260,6 @@ def test_base_tags(self):
'ua_browser_family:Firefox',
'host:example.com:8080']))

def test_reporter(self):
self.metrics.reset_mock()
self.factory.periodic_reporter(self.metrics)

# Verify metric increase of nothing
calls = self.metrics.method_calls
eq_(len(calls), 4)
name, args, _ = calls[0]
eq_(name, "gauge")
eq_(args, ("update.client.writers", 0))

def test_handshake_sub(self):
self.factory.externalPort = 80

Expand Down Expand Up @@ -414,7 +402,7 @@ def test_close_with_delivery_cleanup_using_webpush(self):
yield self._wait_for(lambda: self.mock_agent.mock_calls)
self.flushLoggedErrors()

@inlineCallbacks
#@inlineCallbacks
def test_close_with_delivery_cleanup_and_get_no_result(self):
self._connect()
self.proto.ps.uaid = uuid.uuid4().hex
Expand All @@ -428,16 +416,12 @@ def test_close_with_delivery_cleanup_and_get_no_result(self):
self.proto.db.storage.save_notification = Mock()
self.proto.db.router.get_uaid = mock_get = Mock()
mock_get.return_value = False
self.metrics.reset_mock()

# Close the connection
self.proto.onClose(True, None, None)
yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2)
eq_(len(self.metrics.mock_calls), 3)
self.metrics.increment.assert_called_with(
"client.notify_uaid_failure", tags=None)
#yield self._wait_for_close()

@inlineCallbacks
#@inlineCallbacks
def test_close_with_delivery_cleanup_and_get_uaid_error(self):
self._connect()
self.proto.ps.uaid = uuid.uuid4().hex
Expand All @@ -459,10 +443,7 @@ def raise_item(*args, **kwargs):

# Close the connection
self.proto.onClose(True, None, None)
yield self._wait_for(lambda: len(self.metrics.mock_calls) > 2)
eq_(len(self.metrics.mock_calls), 3)
self.metrics.increment.assert_called_with(
"client.lookup_uaid_failure", tags=None)
#yield self._wait_for_close()

@inlineCallbacks
def test_close_with_delivery_cleanup_and_no_node_id(self):
Expand Down
5 changes: 2 additions & 3 deletions autopush/web/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ def post(self, router_type, router_data):
Router type/data registration.
"""
self.metrics.increment("updates.client.register",
tags=self.base_tags())
self.metrics.increment("ua.command.register", tags=self.base_tags())

uaid = uuid.uuid4()

Expand Down Expand Up @@ -455,7 +454,7 @@ class ChannelRegistrationHandler(BaseRegistrationHandler):
@threaded_validate(UnregisterChidSchema)
def delete(self, uaid, chid):
# type: (uuid.UUID, str) -> Deferred
self.metrics.increment("updates.client.unregister",
self.metrics.increment("ua.command.unregister",
tags=self.base_tags())
d = deferToThread(self._delete_channel, uaid, chid)
d.addCallback(self._success)
Expand Down
2 changes: 1 addition & 1 deletion autopush/web/simplepush.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _router_completed(self, response, uaid_data, warning=""):
self.log.debug(format="Router miss, message stored.",
client_info=self._client_info)
time_diff = time.time() - self._start_time
self.metrics.timing("updates.handled", duration=time_diff)
self.metrics.timing("notification.request_time", duration=time_diff)
response.response_body = (
response.response_body + " " + warning).strip()
self._router_response(response)
2 changes: 1 addition & 1 deletion autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def _router_completed(self, response, uaid_data, warning=""):
self.log.debug(
format="Router miss, message stored.",
client_info=self._client_info)
self.metrics.timing("updates.handled", duration=time_diff)
self.metrics.timing("notification.request_time", duration=time_diff)
response.response_body = (
response.response_body + " " + warning).strip()
self._router_response(response)
50 changes: 7 additions & 43 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,6 @@ def __init__(self, db, request):
if host:
self._base_tags.append("host:%s" % host)

db.metrics.increment("client.socket.connect",
tags=self._base_tags or None)

self._should_stop = False
self._paused = False
self.uaid = None
Expand Down Expand Up @@ -453,13 +450,9 @@ def _sendAutoPing(self):
"""Override for sanity checking during auto-ping interval"""
if not self.ps.uaid:
# No uaid yet, drop the connection
self.metrics.increment("client.autoping.no_uaid",
tags=self.base_tags)
self.sendClose()
elif self.factory.clients.get(self.ps.uaid) != self:
# UAID, but we're not in clients anymore for some reason
self.metrics.increment("client.autoping.invalid_client",
tags=self.base_tags)
self.sendClose()
return WebSocketServerProtocol._sendAutoPing(self)

Expand All @@ -476,14 +469,7 @@ def nukeConnection(self):
still hadn't run by this point"""
# Did onClose get called? If so, we shutdown properly, no worries.
if hasattr(self, "_shutdown_ran"):
self.metrics.increment("client.success.sendClose",
tags=self.base_tags)
return

# Uh-oh, we have not been shut-down properly, report detailed data
self.metrics.increment("client.error.sendClose_failed",
tags=self.base_tags)

self.transport.abortConnection()

@log_exception
Expand Down Expand Up @@ -605,9 +591,8 @@ def onClose(self, wasClean, code, reason):
def cleanUp(self, wasClean, code, reason):
"""Thorough clean-up method to cancel all remaining deferreds, and send
connection metrics in"""
self.metrics.increment("client.socket.disconnect", tags=self.base_tags)
elapsed = (ms_time() - self.ps.connected_at) / 1000.0
self.metrics.timing("client.socket.lifespan", duration=elapsed,
self.metrics.timing("ua.connection.lifespan", duration=elapsed,
tags=self.base_tags)
self.ps.stats.connection_time = int(elapsed)

Expand Down Expand Up @@ -678,8 +663,6 @@ def _trap_uaid_not_found(self, fail):
# type: (failure.Failure) -> None
"""Traps UAID not found error"""
fail.trap(ItemNotFound)
self.metrics.increment("client.lookup_uaid_failure",
tags=self.base_tags)

def _notify_node(self, result):
"""Checks the result of lookup node to send the notify if the client is
Expand Down Expand Up @@ -857,8 +840,6 @@ def _verify_user_record(self):
uaid_hash=self.ps.uaid_hash,
uaid_record=dump_uaid(record))
self.force_retry(self.db.router.drop_user, self.ps.uaid)
self.metrics.increment("client.drop_user",
tags={"errno": 104})
return None

# Validate webpush records
Expand Down Expand Up @@ -961,7 +942,7 @@ def finish_hello(self, previous):
self.sendJSON(msg)
self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash,
**self.ps.raw_agent)
self.metrics.increment("updates.client.hello", tags=self.base_tags)
self.metrics.increment("ua.command.hello", tags=self.base_tags)
self.process_notifications()

def process_notifications(self):
Expand Down Expand Up @@ -1139,7 +1120,7 @@ def finish_webpush_notifications(self, result):
if self.sent_notification_count > self.ap_settings.msg_limit:
raise MessageOverloadException()
if notif.topic:
self.metrics.increment("updates.notification.topic",
self.metrics.increment("notification.topic",
tags=self.base_tags)
self.sendJSON(msg)

Expand Down Expand Up @@ -1213,7 +1194,6 @@ def error_monthly_rotation_overload(self, fail):
def _send_ping(self):
"""Helper for ping sending that tracks when the ping was sent"""
self.ps.last_ping = time.time()
self.metrics.increment("updates.client.ping", tags=self.base_tags)
return self.sendMessage("{}", False)

def process_ping(self):
Expand Down Expand Up @@ -1289,7 +1269,7 @@ def send_register_finish(self, result, endpoint, chid):
"status": 200
}
self.sendJSON(msg)
self.metrics.increment("updates.client.register", tags=self.base_tags)
self.metrics.increment("ua.command.register", tags=self.base_tags)
self.ps.stats.registers += 1
self.log.debug(format="Register", channel_id=chid,
endpoint=endpoint,
Expand All @@ -1307,7 +1287,7 @@ def process_unregister(self, data):
except ValueError:
return self.bad_message("unregister", "Invalid ChannelID")

self.metrics.increment("updates.client.unregister",
self.metrics.increment("ua.command.unregister",
tags=self.base_tags)
self.ps.stats.unregisters += 1
event = dict(format="Unregister", channel_id=chid,
Expand Down Expand Up @@ -1470,7 +1450,7 @@ def process_ack(self, data):
if not updates or not isinstance(updates, list):
return

self.metrics.increment("updates.client.ack", tags=self.base_tags)
self.metrics.increment("ua.command.ack", tags=self.base_tags)
defers = filter(None, map(self.ack_update, updates))

if defers:
Expand Down Expand Up @@ -1578,17 +1558,6 @@ def __init__(self, ap_settings, db, agent, clients):
closeHandshakeTimeout=ap_settings.close_handshake_timeout,
)

def periodic_reporter(self, metrics):
# type: (IMetrics) -> None
"""Twisted Task function that runs every few seconds to emit general
metrics regarding twisted and client counts.
"""
metrics.gauge("update.client.writers", len(reactor.getWriters()))
metrics.gauge("update.client.readers", len(reactor.getReaders()))
metrics.gauge("update.client.connections", len(self.clients))
metrics.gauge("update.client.ws_connections", self.countConnections)


class RouterHandler(BaseHandler):
"""Router Handler
Expand All @@ -1606,20 +1575,17 @@ def put(self, uaid):
client = self.application.clients.get(uaid)
if not client:
self.set_status(404, reason=None)
self.metrics.increment("updates.router.disconnected")
self.write("Client not connected.")
return

if client.paused:
self.set_status(503, reason=None)

self.metrics.increment("updates.router.busy")
self.write("Client busy.")
return

update = json.loads(self.request.body)
client.send_notification(update)
self.metrics.increment("updates.router.received")
self.write("Client accepted for delivery")


Expand All @@ -1635,21 +1601,19 @@ def put(self, uaid, *args):
client = self.application.clients.get(uaid)
if not client:
self.set_status(404, reason=None)
self.metrics.increment("updates.notification.disconnected")
self.write("Client not connected.")
return

if client.paused:
# Client already busy waiting for stuff, flag for check
client._check_notifications = True
self.set_status(202)
self.metrics.increment("updates.notification.flagged")
self.write("Flagged for Notification check")
return

# Client is online and idle, start a notification check
client.process_notifications()
self.metrics.increment("updates.notification.checking")
self.metrics.increment("ua.notification_check")
self.write("Notification check started")

def delete(self, uaid, connected_at):
Expand Down

0 comments on commit 1e331fe

Please sign in to comment.