diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index dff22909..8abe9a0d 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -82,6 +82,11 @@ def logged_ci(self, predicate): return self.logged( lambda e: 'client_info' in e and predicate(e['client_info'])) + def logged_session(self): + """Extract the last logged session""" + return filter(lambda e: e["log_format"] == "Session", + self._events)[-1] + def setUp(): logging.getLogger('boto').setLevel(logging.CRITICAL) @@ -516,6 +521,9 @@ def test_delivery_while_disconnected(self): eq_(len(result["updates"]), 1) eq_(result["updates"][0]["channelID"], chan) yield self.shut_down(client) + log_event = self.logs.logged_session() + eq_(log_event["connection_type"], "simplepush") + eq_(log_event["direct_acked"], 0) @inlineCallbacks def test_delivery_repeat_without_ack(self): @@ -546,6 +554,10 @@ def test_direct_delivery_without_ack(self): result = yield client.send_notification() ok_(result != {}) yield client.disconnect() + log_event = self.logs.logged_session() + eq_(log_event["direct_acked"], 0) + eq_(log_event["direct_storage"], 1) + yield client.connect() yield client.hello() result2 = yield client.get_notification(timeout=5) @@ -571,12 +583,19 @@ def test_dont_deliver_acked(self): eq_(update["channelID"], chan) yield client.ack(chan, update["version"]) yield client.disconnect() + log_event = self.logs.logged_session() + eq_(log_event["connection_type"], "simplepush") + eq_(log_event["stored_acked"], 1) + time.sleep(0.2) yield client.connect() yield client.hello() result = yield client.get_notification() eq_(result, None) yield self.shut_down(client) + log_event = self.logs.logged_session() + eq_(log_event["connection_type"], "simplepush") + eq_(log_event["stored_acked"], 0) @inlineCallbacks def test_no_delivery_to_unregistered(self): @@ -820,6 +839,9 @@ def test_basic_delivery(self): eq_(result["data"], base64url_encode(data)) eq_(result["messageType"], "notification") yield self.shut_down(client) + log_event = self.logs.logged_session() + eq_(log_event["connection_type"], "webpush") + eq_(log_event["direct_storage"], 1) @inlineCallbacks def test_topic_basic_delivery(self): diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index c5ee2ad3..e6973073 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -59,6 +59,7 @@ dummy_chid = uuid.uuid4() dummy_chid_str = str(dummy_chid) dummy_uaid = uuid.uuid4() +dummy_uaid_str = dummy_uaid.hex def dummy_notif(**kwargs): @@ -340,7 +341,7 @@ def test_bad_json(self): def test_no_messagetype_after_hello(self): self._connect() - self.proto.ps.uaid = "asdf" + self.proto.ps.uaid = dummy_uaid_str self._send_message(dict(data="wassup")) def check_result(close_args): @@ -353,7 +354,7 @@ def check_result(close_args): def test_unknown_messagetype(self): self._connect() - self.proto.ps.uaid = "asdf" + self.proto.ps.uaid = dummy_uaid_str self._send_message(dict(messageType="wassup")) def check_result(close_args): @@ -366,8 +367,8 @@ def check_result(close_args): def test_close_with_cleanup(self): self._connect() - self.proto.ps.uaid = "asdf" - self.proto.ap_settings.clients["asdf"] = self.proto + self.proto.ps.uaid = dummy_uaid_str + self.proto.ap_settings.clients[dummy_uaid_str] = self.proto # Stick a mock on notif_mock = Mock() @@ -380,8 +381,8 @@ def test_close_with_cleanup(self): def test_close_with_delivery_cleanup(self): self._connect() - self.proto.ps.uaid = uuid.uuid4().hex - self.proto.ap_settings.clients["asdf"] = self.proto + self.proto.ps.uaid = dummy_uaid_str + self.proto.ap_settings.clients[dummy_uaid_str] = self.proto chid = str(uuid.uuid4()) # Stick an un-acked direct notification in diff --git a/autopush/websocket.py b/autopush/websocket.py index e147a933..ab551c71 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -36,6 +36,11 @@ from functools import partial, wraps from random import randrange +import attr +from attr import ( + attrs, + attrib +) from autobahn.twisted.resource import WebSocketResource from autobahn.twisted.websocket import ( WebSocketServerFactory, @@ -139,6 +144,40 @@ def wrapper(self, *args, **kwargs): return wrapper +@attrs(slots=True) +class SessionStatistics(object): + """Websocket Session Statistics + + Tracks statistics about the session that are logged when the websocket + session has been closed. + + """ + # User data + uaid_hash = attrib(default="") # type: str + uaid_reset = attrib(default=False) # type: bool + existing_uaid = attrib(default=False) # type: bool + connection_type = attrib(default="") # type: str + host = attrib(default="") # type: str + ua_os_family = attrib(default="") # type: str + ua_os_ver = attrib(default="") # type: str + ua_browser_family = attrib(default="") # type: str + ua_browser_ver = attrib(default="") # type: str + connection_time = attrib(default=0) # type: int + + # Usage data + direct_acked = attrib(default=0) # type: int + direct_storage = attrib(default=0) # type: int + stored_retrieved = attrib(default=0) # type: int + stored_acked = attrib(default=0) # type: int + nacks = attrib(default=0) # type: int + unregisters = attrib(default=0) # type: int + registers = attrib(default=0) # type: int + + def logging_data(self): + # type: () -> Dict[str, Any] + return attr.asdict(self) + + @implementer(IProducer) class PushState(object): @@ -149,9 +188,8 @@ class PushState(object): '_should_stop', '_paused', 'metrics', - 'uaid', - 'uaid_obj', - 'uaid_hash', + '_uaid_obj', + '_uaid_hash', 'raw_agent', 'last_ping', 'check_storage', @@ -160,6 +198,7 @@ class PushState(object): 'wake_data', 'connected_at', 'settings', + 'stats', # Table rotation 'message_month', @@ -179,16 +218,12 @@ class PushState(object): 'direct_updates', 'msg_limit', - 'reset_uaid', - - # iProducer methods - 'pauseProducing', - 'resumeProducing', - 'stopProducing', + '_reset_uaid', ] def __init__(self, settings, request): self._callbacks = [] + self.stats = SessionStatistics() self.settings = settings host = "" @@ -198,11 +233,14 @@ def __init__(self, settings, request): host = request.host else: self._user_agent = None + + self.stats.host = host self._base_tags = [] self.raw_agent = {} if self._user_agent: dd_tags, self.raw_agent = parse_user_agent(self._user_agent) for tag_name, tag_value in dd_tags.items(): + setattr(self.stats, tag_name, tag_value) self._base_tags.append("%s:%s" % (tag_name, tag_value)) if host: self._base_tags.append("host:%s" % host) @@ -212,9 +250,7 @@ def __init__(self, settings, request): self.metrics = settings.metrics self.metrics.increment("client.socket.connect", tags=self._base_tags or None) - self.uaid = None # Optional[str] - self.uaid_obj = None # Optional[uuid.UUID] - self.uaid_hash = "" + self.uaid = None self.last_ping = 0 self.check_storage = False self.use_webpush = False @@ -240,9 +276,11 @@ def __init__(self, settings, request): self._register = None # Reflects Notification's sent that haven't been ack'd + # This is simplepush style by default self.updates_sent = {} # Track Notification's we don't need to delete separately + # This is simplepush style by default self.direct_updates = {} # Whether this record should be reset after delivering stored @@ -257,8 +295,55 @@ def message(self): @property def user_agent(self): + # type: () -> str return self._user_agent or "None" + @property + def reset_uaid(self): + # type: () -> bool + return self._reset_uaid + + @reset_uaid.setter + def reset_uaid(self, value): + if value: + self._reset_uaid = True + self.stats.uaid_reset = True + else: + self._reset_uaid = False + + @property + def uaid_obj(self): + # type: () -> Optional[uuid.UUID] + return self._uaid_obj + + @property + def uaid_hash(self): + # type: () -> str + return self._uaid_hash + + @property + def uaid(self): + # type: () -> Optional[str] + return self._uaid_obj.hex if self._uaid_obj else None + + @uaid.setter + def uaid(self, value): + self._uaid_obj = uuid.UUID(value) if value else None + self._uaid_hash = hasher(value) if value else "" + self.stats.uaid_hash = self._uaid_hash + + def set_connection_type(self, conn_type): + """Set the connection type for the client""" + self.use_webpush = conn_type == "webpush" + self._base_tags.append("use_webpush:{}".format(self.use_webpush)) + self.router_type = conn_type + self.stats.connection_type = conn_type + + if conn_type == "webpush": + # Update our message tracking for webpush + self.updates_sent = defaultdict(lambda: []) + self.direct_updates = defaultdict(lambda: []) + def pauseProducing(self): """IProducer implementation tracking if we should pause output""" self._paused = True @@ -528,6 +613,7 @@ def cleanUp(self, wasClean, code, reason): elapsed = (ms_time() - self.ps.connected_at) / 1000.0 self.ps.metrics.timing("client.socket.lifespan", duration=elapsed, tags=self.base_tags) + self.ps.stats.connection_time = int(elapsed) # Cleanup our client entry if self.ps.uaid and self.ap_settings.clients.get(self.ps.uaid) == self: @@ -544,9 +630,12 @@ def cleanUp(self, wasClean, code, reason): if self.ps.use_webpush: for notifs in self.ps.direct_updates.values(): notifs = filter(lambda x: x.ttl != 0, notifs) + self.ps.stats.direct_storage += len(notifs) defers.extend(map(self._save_webpush_notif, notifs)) else: - for chid, version in self.ps.direct_updates.items(): + items = self.ps.direct_updates.items() + self.ps.stats.direct_storage += len(items) + for chid, version in items: defers.append(self._save_simple_notif(chid, version)) # Tag on the notifier once everything has been stored @@ -557,6 +646,9 @@ def cleanUp(self, wasClean, code, reason): del self.ps.direct_updates del self.ps.updates_sent + # Log out sessions stats + self.log.info("Session", **self.ps.stats.logging_data()) + def _save_webpush_notif(self, notif): """Save a direct_update webpush style notification""" return deferToThread( @@ -685,20 +777,14 @@ def process_hello(self, data): if self.ps.uaid: return self.returnError("hello", "duplicate hello", 401) - uaid = data.get("uaid") - self.ps.use_webpush = data.get("use_webpush", False) - self.ps._base_tags.append("use_webpush:%s" % - self.ps.use_webpush) - self.ps.router_type = "webpush" if self.ps.use_webpush\ - else "simplepush" - if self.ps.use_webpush: - self.ps.updates_sent = defaultdict(lambda: []) - self.ps.direct_updates = defaultdict(lambda: []) + conn_type = "webpush" if data.get("use_webpush", False) else \ + "simplepush" + self.ps.set_connection_type(conn_type) + uaid = data.get("uaid") existing_user, uaid = validate_uaid(uaid) self.ps.uaid = uaid - self.ps.uaid_obj = uuid.UUID(uaid) - self.ps.uaid_hash = hasher(uaid) + self.ps.stats.existing_uaid = existing_user # Check for the special wakeup commands if "wakeup_host" in data and "mobilenetwork" in data: wakeup_host = data.get("wakeup_host") @@ -736,6 +822,7 @@ def _register_user(self, existing_user=True): if not user_item: # No valid user record, consider this a new user self.ps.uaid = uuid.uuid4().hex + self.ps.stats.uaid_reset = True user_item = dict( uaid=self.ps.uaid, node_id=self.ap_settings.router_url, @@ -771,7 +858,7 @@ def _verify_user_record(self): # cases a record exists for some users that doesn't if "router_type" not in record or "connected_at" not in record: self.log.info(format="Dropping User", code=104, - uaid_hash=hasher(self.ps.uaid), + uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) self.force_retry(self.ap_settings.router.drop_user, self.ps.uaid) return None @@ -782,7 +869,7 @@ def _verify_user_record(self): if ("current_month" not in record) or record["current_month"] \ not in self.ps.settings.message_tables: self.log.info(format="Dropping User", code=105, - uaid_hash=hasher(self.ps.uaid), + uaid_hash=self.ps.uaid_hash, uaid_record=dump_uaid(record)) self.force_retry(self.ap_settings.router.drop_user, self.ps.uaid) @@ -966,6 +1053,7 @@ def finish_notifications(self, notifs): for s in notifs: chid = s['chid'] version = int(s['version']) + self.ps.stats.stored_retrieved += 1 if self._newer_notification_sent(chid, version): continue if chid in self.ps.direct_updates: @@ -1033,6 +1121,7 @@ def finish_webpush_notifications(self, result): now = int(time.time()) messages_sent = False for notif in notifs: + self.ps.stats.stored_retrieved += 1 # If the TTL is too old, don't deliver and fire a delete off if notif.expired(at_time=now): if not notif.sortkey_timestamp: @@ -1202,6 +1291,7 @@ def send_register_finish(self, result, endpoint, chid): self.sendJSON(msg) self.ps.metrics.increment("updates.client.register", tags=self.base_tags) + self.ps.stats.registers += 1 self.log.info(format="Register", channel_id=chid, endpoint=endpoint, uaid_hash=self.ps.uaid_hash, @@ -1220,8 +1310,8 @@ def process_unregister(self, data): self.ps.metrics.increment("updates.client.unregister", tags=self.base_tags) - - event = dict(format="Unregister", channelID=chid, + self.ps.stats.unregisters += 1 + event = dict(format="Unregister", channel_id=chid, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, **self.ps.raw_agent) @@ -1287,6 +1377,7 @@ def ver_filter(notif): message_size=size, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, **self.ps.raw_agent) + self.ps.stats.direct_acked += 1 self.ps.direct_updates[chid].remove(msg) return @@ -1301,6 +1392,7 @@ def ver_filter(notif): message_size=size, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, **self.ps.raw_agent) + self.ps.stats.stored_acked += 1 if msg.sortkey_timestamp: # Is this the last un-acked message we're waiting for? @@ -1356,12 +1448,14 @@ def _handle_simple_ack(self, chid, version, code): uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, **self.ps.raw_agent) + self.ps.stats.direct_acked += 1 return self.log.info(format="Ack", router_key="simplepush", channel_id=chid, message_id=version, message_source="stored", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, **self.ps.raw_agent) + self.ps.stats.stored_acked += 1 if chid in self.ps.updates_sent and \ self.ps.updates_sent[chid] <= version: del self.ps.updates_sent[chid] @@ -1397,6 +1491,7 @@ def process_nack(self, data): self.log.info(format="Nack", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, message_id=version, code=code, **self.ps.raw_agent) + self.ps.stats.nacks += 1 def check_missed_notifications(self, results, resume=False): """Check to see if notifications were missed"""