From 945bc1776d2ea1eef701a3eb866766dc36a8dc8e Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Fri, 18 Mar 2016 16:59:24 -0700 Subject: [PATCH] feat: update logging for newstyle twisted and file output Updates all logging calls to use the newer logging utilities in twisted. This obsoletes the use of eliot, as twisteds logger can handle arbitrary keywords. The logging handling has been revamped into a more easily testable class that implements twisteds IObserver for easy logger observing. log_level support has been re-added so that calls can be appropriately scoped by logging type. Existing info/msg log calls now use appropriate log levels, with delayed function calls to prevent unnecessary overhead when formatting objects if the log message is ignored. Closes #419 --- .travis.yml | 6 +- autopush/db.py | 7 - autopush/endpoint.py | 66 +++++----- autopush/health.py | 6 +- autopush/logging.py | 204 +++++++++++++++-------------- autopush/main.py | 38 ++++-- autopush/router/apnsrouter.py | 12 +- autopush/router/gcm.py | 15 ++- autopush/router/simple.py | 15 ++- autopush/senderids.py | 15 ++- autopush/tests/test_endpoint.py | 51 ++------ autopush/tests/test_health.py | 6 +- autopush/tests/test_integration.py | 2 +- autopush/tests/test_logging.py | 56 +++++--- autopush/tests/test_router.py | 18 +-- autopush/tests/test_websocket.py | 49 +++---- autopush/utils.py | 13 +- autopush/websocket.py | 74 ++++++----- doc-requirements.txt | 1 - requirements.txt | 1 - test-requirements.txt | 1 - 21 files changed, 329 insertions(+), 327 deletions(-) diff --git a/.travis.yml b/.travis.yml index 24336d53..2255a54d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,8 @@ language: python sudo: false python: - - "pypy" + - "2.7" install: - - wget https://bitbucket.org/pypy/pypy/downloads/pypy-5.0.0-linux64.tar.bz2 - - tar xjvf pypy-5.0.0-linux64.tar.bz2 - - virtualenv -p pypy-5.0.0-linux64/bin/pypy pypy - - source pypy/bin/activate - make travis script: tox -- --with-coverage --cover-xml --cover-package=autopush after_success: diff --git a/autopush/db.py b/autopush/db.py index 6c45f650..ba8c2fc4 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -2,7 +2,6 @@ from __future__ import absolute_import import datetime -import logging import random import time import uuid @@ -21,9 +20,6 @@ from autopush.utils import generate_hash - -log = logging.getLogger(__file__) - key_hash = "" TRACK_DB_CALLS = False DB_CALLS = [] @@ -53,8 +49,6 @@ def hasher(uaid): def normalize_id(id): - if not id: - return id if (len(id) == 36 and id[8] == id[13] == id[18] == id[23] == '-'): return id.lower() @@ -434,7 +428,6 @@ def update_message(self, uaid, channel_id, message_id, ttl, data=None, else: expr += " REMOVE #dd, headers" expr_values = self.encode({":%s" % k: v for k, v in item.items()}) - log.debug(expr_values) conn.update_item( self.table.table_name, db_key, diff --git a/autopush/endpoint.py b/autopush/endpoint.py index 0677d95e..a3915479 100644 --- a/autopush/endpoint.py +++ b/autopush/endpoint.py @@ -38,7 +38,6 @@ from cryptography.fernet import InvalidToken from twisted.internet.defer import Deferred from twisted.internet.threads import deferToThread -from twisted.python import log from autopush.db import ( generate_last_connect, @@ -220,13 +219,13 @@ def _response_err(self, fail): running. """ - log.err(fail, **self._client_info) + self.log.failure(fail, **self._client_info) self._write_response(500, 999) def _overload_err(self, fail): """errBack for throughput provisioned exceptions""" fail.trap(ProvisionedThroughputExceededException) - log.msg("Throughput Exceeded", **self._client_info) + self.log.info("Throughput Exceeded", **self._client_info) self._write_response(503, 201) def _router_response(self, response): @@ -248,40 +247,40 @@ def _router_fail_err(self, fail): fail.trap(RouterException) exc = fail.value if exc.log_exception: - log.err(fail, **self._client_info) # pragma nocover + self.log.failure(fail, **self._client_info) # pragma nocover if 200 <= exc.status_code < 300: - log.msg("Success", status_code=exc.status_code, - logged_status=exc.logged_status or "", - **self._client_info) + self.log.info("Success", status_code=exc.status_code, + logged_status=exc.logged_status or "", + **self._client_info) elif 400 <= exc.status_code < 500: - log.msg("Client error", status_code=exc.status_code, - logged_status=exc.logged_status or "", - errno=exc.errno or "", - **self._client_info) + self.log.info("Client error", status_code=exc.status_code, + logged_status=exc.logged_status or "", + errno=exc.errno or "", + **self._client_info) self._router_response(exc) def _uaid_not_found_err(self, fail): """errBack for uaid lookup not finding the user""" fail.trap(ItemNotFound) - log.msg("UAID not found in AWS.", **self._client_info) + self.log.debug("UAID not found in AWS.", **self._client_info) self._write_response(410, 103) def _token_err(self, fail): """errBack for token decryption fail""" fail.trap(InvalidToken, InvalidTokenException) - log.msg("Invalid token", **self._client_info) + self.log.debug("Invalid token", **self._client_info) self._write_response(400, 102) def _auth_err(self, fail): """errBack for invalid auth token""" fail.trap(VapidAuthException) - log.msg("Invalid Auth token", **self._client_info) + self.log.debug("Invalid Auth token", **self._client_info) self._write_unauthorized_response(message=fail.value.message) def _chid_not_found_err(self, fail): """errBack for unknown chid""" fail.trap(ItemNotFound, ValueError) - log.msg("CHID not found in AWS.", **self._client_info) + self.log.debug("CHID not found in AWS.", **self._client_info) self._write_response(410, 106) ############################################################# @@ -307,7 +306,7 @@ def _invalid_auth(self, fail): message = fail.value.message or repr(fail.value) if isinstance(fail.value, AssertionError): message = "A decryption error occurred" - log.msg("Invalid bearer token: " + message, **self._client_info) + self.log.debug("Invalid bearer token: " + message, **self._client_info) raise VapidAuthException("Invalid bearer token: " + message) def _process_auth(self, result): @@ -380,7 +379,7 @@ def _delete_message(self, kind, uaid, chid): return d def _delete_completed(self, response): - log.msg("Message Deleted", status_code=204, **self._client_info) + self.log.info("Message Deleted", status_code=204, **self._client_info) self.set_status(204) self.finish() @@ -439,8 +438,8 @@ def _uaid_lookup_results(self, result): try: self.router = self.ap_settings.routers[router_key] except KeyError: - log.msg("Invalid router requested", status_code=400, errno=108, - **self._client_info) + self.log.debug("Invalid router requested", status_code=400, + errno=108, **self._client_info) return self._write_response(400, 108, message="Invalid router") @@ -457,13 +456,13 @@ def _uaid_lookup_results(self, result): req_fields = ["content-encoding", "encryption"] if data and not all([x in self.request.headers for x in req_fields]): - log.msg("Client error", status_code=400, errno=101, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=101, + **self._client_info) return self._write_response(400, 101) if ("encryption-key" in self.request.headers and "crypto-key" in self.request.headers): - log.msg("Client error", status_code=400, errno=110, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=110, + **self._client_info) return self._write_response( 400, 110, message="Invalid crypto headers") self._client_info["message_size"] = len(data) if data else 0 @@ -478,13 +477,13 @@ def _uaid_lookup_results(self, result): # Cap the TTL to our MAX_TTL ttl = min(ttl, MAX_TTL) else: - log.msg("Client error", status_code=400, - errno=112, **self._client_info) + self.log.debug("Client error", status_code=400, + errno=112, **self._client_info) return self._write_response(400, 112, message="Invalid TTL header") if data and len(data) > self.ap_settings.max_data: - log.msg("Client error", status_code=400, errno=104, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=104, + **self._client_info) return self._write_response( 413, 104, message="Data payload too large") @@ -538,9 +537,10 @@ def _router_completed(self, response, uaid_data): return d else: if response.status_code == 200 or response.logged_status == 200: - log.msg("Successful delivery", **self._client_info) + self.log.info("Successful delivery", **self._client_info) elif response.status_code == 202 or response.logged_status == 202: - log.msg("Router miss, message stored.", **self._client_info) + self.log.info("Router miss, message stored.", + **self._client_info) time_diff = time.time() - self.start_time self.metrics.timing("updates.handled", duration=time_diff) self._router_response(response) @@ -592,7 +592,7 @@ def post(self, router_type="", router_token="", uaid="", chid=""): # normalize the path vars into parameters if router_type not in self.ap_settings.routers: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -645,7 +645,7 @@ def put(self, router_type="", router_token="", uaid="", chid=""): self.uaid = uaid router_data = params if router_type not in self.ap_settings.routers or not router_data: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -693,7 +693,7 @@ def delete(self, router_type="", router_token="", uaid="", chid=""): return self._write_unauthorized_response( message="Invalid Authentication") if router_type not in self.ap_settings.routers: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -764,7 +764,7 @@ def _return_endpoint(self, endpoint_data, new_uaid, router=None): else: msg = dict(channelID=self.chid, endpoint=endpoint_data[0]) self.write(json.dumps(msg)) - log.msg("Endpoint registered via HTTP", **self._client_info) + self.log.debug("Endpoint registered via HTTP", **self._client_info) self.finish() def _success(self, result): diff --git a/autopush/health.py b/autopush/health.py index 7d5dd238..00a88f4a 100644 --- a/autopush/health.py +++ b/autopush/health.py @@ -6,7 +6,7 @@ ) from twisted.internet.defer import DeferredList from twisted.internet.threads import deferToThread -from twisted.python import log +from twisted.logger import Logger from autopush import __version__ @@ -18,6 +18,8 @@ class MissingTableException(Exception): class HealthHandler(cyclone.web.RequestHandler): """HTTP Health Handler""" + log = Logger() + @cyclone.web.asynchronous def get(self): """HTTP Get @@ -54,7 +56,7 @@ def _check_success(self, result, name): def _check_error(self, failure, name): """Returns an error, and why""" self._healthy = False - log.err(failure, name) + self.log.failure(failure, name) cause = self._health_checks[name] = {"status": "NOT OK"} if failure.check(InternalServerError): diff --git a/autopush/logging.py b/autopush/logging.py index e1f2d7ba..b7935d45 100644 --- a/autopush/logging.py +++ b/autopush/logging.py @@ -1,120 +1,130 @@ """Custom Logging Setup - -This module sets up eliot structured logging, intercepts stdout output from -twisted, and pipes it through eliot for later processing into Kibana per -Mozilla Services standard structured logging. - """ -# TWISTED_LOG_MESSAGE and EliotObserver licensed under APL 2.0 from -# ClusterHQ/flocker -# https://github.com/ClusterHQ/flocker/blob/master/flocker/common/script.py#L81-L106 +import io import json -import os import pkg_resources import socket import sys import raven -from eliot import (add_destination, fields, - Logger, MessageType) from twisted.internet import reactor -from twisted.python.log import textFromEventDict, startLoggingWithObserver +from twisted.logger import ( + formatEvent, + formatEventAsClassicLogText, + globalLogPublisher, + LogLevel, + ILogObserver +) +from zope.interface import implementer HOSTNAME = socket.getfqdn() -LOGGER = None -TWISTED_LOG_MESSAGE = MessageType("twisted:log", - fields(error=bool, message=unicode), - u"A log message from Twisted.") -HUMAN = False - -class EliotObserver(object): - """A Twisted log observer that logs to Eliot""" - def __init__(self): - """Create the Eliot Observer""" - if os.environ.get("SENTRY_DSN"): +# A complete set of keys we don't include in Fields from a log event +IGNORED_KEYS = frozenset([ + "factory", + "failure", + "format", + "isError", + "log_format", + "log_flattened", + "log_level", + "log_legacy", + "log_logger", + "log_source", + "log_system", + "log_text", + "log_time", + "log_trace", + "message", + "message_type", + "severity", + "task_level", + "time", + "timestamp", + "type", + "why", +]) + + +@implementer(ILogObserver) +class PushLogger(object): + def __init__(self, logger_name, log_level="debug", log_format="json", + log_output="stdout", sentry_dsn=None): + self.logger_name = "-".join([ + logger_name, + pkg_resources.get_distribution("autopush").version + ]) + self._filename = None + self.log_level = LogLevel.lookupByName(log_level) + if log_output == "stdout": + self._output = sys.stdout + else: + self._filename = log_output + self._output = "file" + if log_format == "json": + self.format_event = self.json_format + else: + self.format_event = formatEventAsClassicLogText + if sentry_dsn: self.raven_client = raven.Client( release=raven.fetch_package_version("autopush")) else: self.raven_client = None - self.logger = Logger() - - def raven_log(self, event): - """Log out twisted exception failures to Raven""" - f = event['failure'] - # Throw back to event loop - reactor.callFromThread( - self.raven_client.captureException, - (f.type, f.value, f.getTracebackObject()) - ) - def __call__(self, msg): - """Called to log out messages""" - error = bool(msg.get("isError")) + def __call__(self, event): + if event["log_level"] < self.log_level: + return - if self.raven_client and 'failure' in msg: - self.raven_log(msg) - error = True + if self.raven_client and 'failure' in event: + f = event["failure"] + reactor.callFromThread( + self.raven_client.captureException, + (f.type, f.value, f.getTracebackObject()) + ) - # Twisted log messages on Python 2 are bytes. We don't know the - # encoding, but assume it's ASCII superset. Charmap will translate - # ASCII correctly, and higher-bit characters just map to - # corresponding Unicode code points, and will never fail at decoding. - message = unicode(textFromEventDict(msg), "charmap") - kw = msg.copy() - for key in ["message", "isError", "failure", "why", "format"]: - kw.pop(key, None) - TWISTED_LOG_MESSAGE(error=error, message=message, **kw).write( - self.logger) + text = self.format_event(event) + self._output.write(unicode(text)) + self._output.flush() - def start(self): - """Start capturing Twisted logs.""" - startLoggingWithObserver(self, setStdout=False) + def json_format(self, event): + error = bool(event.get("isError")) or "failure" in event + ts = event["log_time"] - -def stdout(message): - """Format a message appropriately for structured logging capture of stdout - and then write it to stdout""" - if HUMAN: - if message['error']: - sys.stdout.write("ERROR: %s\n" % message['message']) + if error: + severity = 3 else: - sys.stdout.write(" %s\n" % message['message']) - return - msg = {} - ts = message.pop("timestamp") - message.pop("time") - del message["task_level"] - msg["Hostname"] = HOSTNAME - if message["error"]: - msg["Severity"] = 3 - else: - msg["Severity"] = 5 - - # 'message_type' is used by twisted logging. Preserve this to the - # 'Type' tag. - if "message_type" in message: - msg["Type"] = message.pop("message_type") - - for key in ["Severity", "type", "severity"]: - if key in message: - msg[key.title()] = message.pop(key) + severity = 5 + + msg = { + "Hostname": HOSTNAME, + "Timestamp": ts * 1000 * 1000 * 1000, + "Type": "twisted:log", + "Severity": event.get("severity") or severity, + "EnvVersion": "2.0", + "Fields": {k: v for k, v in event.iteritems() + if k not in IGNORED_KEYS and + type(v) in (str, unicode, list, int, float)}, + "Logger": self.logger_name, + } + # Add the nicely formatted message + msg["Fields"]["message"] = formatEvent(event) + return json.dumps(msg, skipkeys=True) + "\n" - msg["Timestamp"] = ts * 1000 * 1000 * 1000 - msg["Fields"] = {k: v for k, v in message.items() - if not k.startswith("log_")} - msg["EnvVersion"] = "2.0" - msg["Logger"] = LOGGER - sys.stdout.write(json.dumps(msg, skipkeys=True) + "\n") - - -def setup_logging(logger_name, human=False): - """Patch in the Eliot logger and twisted log interception""" - global LOGGER, HUMAN - LOGGER = "-".join([logger_name, - pkg_resources.get_distribution("autopush").version]) - HUMAN = human - add_destination(stdout) - ellie = EliotObserver() - ellie.start() - return ellie + def start(self): + if self._filename: + self._output = io.open(self._filename, "a", encoding="utf-8") + globalLogPublisher.addObserver(self) + + def stop(self): + globalLogPublisher.removeObserver(self) + if self._filename: + self._output.close() + self._output = None + + @classmethod + def setup_logging(cls, logger_name, log_level="info", log_format="json", + log_output="stdout", sentry_dsn=None): + pl = cls(logger_name, log_level=log_level, log_format=log_format, + log_output=log_output, sentry_dsn=sentry_dsn) + pl.start() + return pl diff --git a/autopush/main.py b/autopush/main.py index e35e423f..00a3f931 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -1,21 +1,23 @@ """autopush/autoendpoint daemon scripts""" +import json +import os + import configargparse import cyclone.web -import json -import autopush.db as db -from autobahn.twisted.websocket import WebSocketServerFactory from autobahn.twisted.resource import WebSocketResource +from autobahn.twisted.websocket import WebSocketServerFactory from twisted.internet import reactor, task -from twisted.python import log +from twisted.logger import Logger from twisted.web.server import Site +import autopush.db as db from autopush.endpoint import ( EndpointHandler, MessageHandler, RegistrationHandler, ) from autopush.health import (HealthHandler, StatusHandler) -from autopush.logging import setup_logging +from autopush.logging import PushLogger from autopush.settings import AutopushSettings from autopush.ssl import AutopushSSLContextFactory from autopush.websocket import ( @@ -34,6 +36,7 @@ '~/.autopush_shared.ini', '.autopush_shared.ini', ] +log = Logger() def add_shared_args(parser): @@ -43,6 +46,10 @@ def add_shared_args(parser): dest='config_file', is_config_file=True) parser.add_argument('--debug', help='Debug Info.', action="store_true", default=False, env_var="DEBUG") + parser.add_argument('--log_level', help='Log level to log', type=str, + default="", env_var="LOG_LEVEL") + parser.add_argument('--log_output', help="Log output, stdout or filename", + default="stdout", env_var="LOG_OUTPUT") parser.add_argument('--crypto_key', help="Crypto key for tokens", default=[], env_var="CRYPTO_KEY", type=str, action="append") @@ -124,7 +131,6 @@ def obsolete_args(parser): These are included to prevent startup errors with old config files. """ - parser.add_argument('--log_level', help="OBSOLETE", type=int) parser.add_argument('--external_router', action="store_true", help='OBSOLETE') parser.add_argument('--max_message_size', type=int, help="OBSOLETE") @@ -297,7 +303,7 @@ def make_settings(args, **kwargs): # unaccounted. senderID = senderIDs.choose_ID() if senderID is None: - log.err("No GCM SenderIDs specified or found.") + log.critical("No GCM SenderIDs specified or found.") return router_conf["gcm"] = {"ttl": args.gcm_ttl, "dryrun": args.gcm_dryrun, @@ -350,7 +356,12 @@ def mount_health_handlers(site, settings): def connection_main(sysargs=None, use_files=True): """Main entry point to setup a connection node, aka the autopush script""" args, parser = _parse_connection(sysargs, use_files) - setup_logging("Autopush", args.human_logs) + log_format = "text" if args.human_logs else "json" + log_level = args.log_level or ("debug" if args.debug else "info") + sentry_dsn = bool(os.environ.get("SENTRY_DSN")) + PushLogger.setup_logging("Autopush", log_level=log_level, + log_format=log_format, log_output=args.log_output, + sentry_dsn=sentry_dsn) settings = make_settings( args, port=args.port, @@ -445,11 +456,16 @@ def endpoint_main(sysargs=None, use_files=True): if args.senderid_list: try: senderid_list = json.loads(args.senderid_list) - except (ValueError, TypeError), x: - log.err("Invalid JSON specified for senderid_list.", x) + except (ValueError, TypeError): + log.critical("Invalid JSON specified for senderid_list") return - setup_logging("Autoendpoint", args.human_logs) + log_level = args.log_level or ("debug" if args.debug else "info") + log_format = "text" if args.human_logs else "json" + sentry_dsn = bool(os.environ.get("SENTRY_DSN")) + PushLogger.setup_logging("Autoendpoint", log_level=log_level, + log_format=log_format, log_output=args.log_output, + sentry_dsn=sentry_dsn) settings = make_settings( args, diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index 7c4b4c6a..d09027db 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -2,7 +2,7 @@ import time import apns -from twisted.python import log +from twisted.logger import Logger from twisted.internet.threads import deferToThread from autopush.router.interface import RouterException, RouterResponse @@ -11,6 +11,7 @@ # https://github.com/djacobs/PyAPNs class APNSRouter(object): """APNS Router Implementation""" + log = Logger() apns = None messages = {} errors = {0: 'No error', @@ -40,7 +41,7 @@ def __init__(self, ap_settings, router_conf): self.default_title = router_conf.get("default_title", "SimplePush") self.default_body = router_conf.get("default_body", "New Alert") self._connect() - log.msg("Starting APNS router...") + self.log.debug("Starting APNS router...") def register(self, uaid, router_data, *kwargs): """Validate that an APNs instance token is in the ``router_data``""" @@ -86,12 +87,13 @@ def _route(self, notification, router_data): def _error(self, err): """Error handler""" if err['status'] == 0: - log.msg("Success") + self.log.debug("Success") del self.messages[err['identifier']] return - log.err("APNs Error encountered: %s" % self.errors[err['status']]) + self.log.debug("APNs Error encountered: {status}", + status=self.errors[err['status']]) if err['status'] in [1, 255]: - log.msg("Retrying...") + self.log.debug("Retrying...") self._connect() resend = self.messages.get(err.get('identifier')) if resend is None: diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 78bbe4ce..12111e97 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -3,8 +3,8 @@ import json from base64 import urlsafe_b64encode -from twisted.python import log from twisted.internet.threads import deferToThread +from twisted.logger import Logger from autopush.router.interface import RouterException, RouterResponse from autopush.senderids import SenderIDs @@ -12,6 +12,7 @@ class GCMRouter(object): """GCM Router Implementation""" + log = Logger() gcm = None dryRun = 0 collapseKey = "simplepush" @@ -30,7 +31,7 @@ def __init__(self, ap_settings, router_conf): self.gcm = gcmclient.GCM(senderID.get("auth")) except: raise IOError("GCM Bridge not initiated in main") - log.msg("Starting GCM router...") + self.log.debug("Starting GCM router...") def check_token(self, token): if token not in self.senderIDs.senderIDs(): @@ -123,7 +124,7 @@ def _route(self, notification, router_data): def _error(self, err, status, **kwargs): """Error handler that raises the RouterException""" - log.err(err, **kwargs) + self.log.debug(err, **kwargs) return RouterException(err, status_code=status, response_body=err, **kwargs) @@ -133,14 +134,15 @@ def _process_reply(self, reply): # for reg_id, msg_id in reply.success.items(): # updates for old_id, new_id in reply.canonical.items(): - log.msg("GCM id changed : %s => " % old_id, new_id) + self.log.debug("GCM id changed : {old} => {new}", + old=old_id, new=new_id) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) # naks: # uninstall: for reg_id in reply.not_registered: - log.msg("GCM no longer registered: %s" % reg_id) + self.log.debug("GCM no longer registered: %s" % reg_id) return RouterResponse( status_code=410, response_body="Endpoint requires client update", @@ -149,7 +151,8 @@ def _process_reply(self, reply): # for reg_id, err_code in reply.failed.items(): if len(reply.failed.items()) > 0: - log.msg("GCM failures: %s" % json.dumps(reply.failed.items())) + self.log.debug("GCM failures: {failed()}", + failed=lambda: json.dumps(reply.failed.items())) raise RouterException("GCM failure to deliver", status_code=503, response_body="Please try request later.") diff --git a/autopush/router/simple.py b/autopush/router/simple.py index a0e6b8ce..a92b1a59 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -26,7 +26,7 @@ ConnectionRefusedError, UserError ) -from twisted.python import log +from twisted.logger import Logger from twisted.web.client import FileBodyProducer from autopush.protocol import IgnoreBody @@ -48,6 +48,8 @@ class SimpleRouter(object): """Implements :class:`autopush.router.interface.IRouter` for internal routing to an Autopush node """ + log = Logger() + def __init__(self, ap_settings, router_conf): """Create a new SimpleRouter""" self.ap_settings = ap_settings @@ -107,7 +109,9 @@ def route_notification(self, notification, uaid_data): yield deferToThread(router.clear_node, uaid_data).addErrback(self._eat_db_err) if isinstance(exc, ConnectionRefusedError): - log.err("Could not route message: %s" % repr(exc)) + # Occurs if an IP record is now used by some other node + # in AWS. + self.log.debug("Could not route message: {exc}", exc=exc) if result and result.code == 200: self.metrics.increment("router.broadcast.hit") returnValue(self.delivered_response(notification)) @@ -159,7 +163,7 @@ def route_notification(self, notification, uaid_data): self.metrics.increment("updates.client.host_gone") dead_cache.put(node_key(node_id), True) if isinstance(exc, ConnectionRefusedError): - log.err("Could not route message: %s" % repr(exc)) + self.log.debug("Could not route message: {exc}", exc=exc) yield deferToThread( router.clear_node, uaid_data).addErrback(self._eat_db_err) @@ -181,8 +185,9 @@ def route_notification(self, notification, uaid_data): data=urlencode(self.udp["data"]), cert=self.conf.get("cert"), timeout=self.conf.get("server_timeout", 3))) - except Exception, x: - log.err("Could not send UDP wake request:", str(x)) + except Exception as exc: + self.log.debug("Could not send UDP wake request: {exc}", + exc=exc) returnValue(retVal) ############################################################# diff --git a/autopush/senderids.py b/autopush/senderids.py index e2c4953b..8d094d79 100644 --- a/autopush/senderids.py +++ b/autopush/senderids.py @@ -30,9 +30,9 @@ from boto.s3.connection import S3Connection from boto.s3.key import Key from boto.exception import S3ResponseError -from twisted.python import log from twisted.internet.threads import deferToThread from twisted.internet.task import LoopingCall +from twisted.logger import Logger # re-read from source every 15 minutes or so. SENDERID_EXPRY = 15*60 @@ -41,6 +41,7 @@ class SenderIDs(object): """Handle Read, Write and cache of SenderID values from S3""" + log = Logger() _expry = SENDERID_EXPRY _senderIDs = {} @@ -58,14 +59,14 @@ def __init__(self, args): self.service = LoopingCall(self._refresh) if senderIDs: if type(senderIDs) is not dict: - log.err("senderid_list is not a dict. Ignoring") + self.log.critical("senderid_list is not a dict. Ignoring") else: # We're initializing, so it's ok to block. self.update(senderIDs) def start(self): if self._use_s3: - log.msg("Starting SenderID service...") + self.log.debug("Starting SenderID service...") self.service.start(self._expry) def _write(self, senderIDs, *args): @@ -93,8 +94,8 @@ def _update_senderIDs(self, *args): candidates = json.loads(key.get_contents_as_string()) if candidates: if type(candidates) is not dict: - log.err("Wrong data type stored for senderIDs. " - "Should be dict. Ignoring.") + self.log.critical("Wrong data type stored for senderIDs. " + "Should be dict. Ignoring.") return return candidates @@ -116,7 +117,7 @@ def update(self, senderIDs): if not senderIDs: return if type(senderIDs) is not dict: - log.err("Wrong data type for senderIDs. Should be dict.") + self.log.critical("Wrong data type for senderIDs. Should be dict.") return if not self._use_s3: # Skip using s3 (For debugging) @@ -148,5 +149,5 @@ def choose_ID(self): def stop(self): if self.service and self.service.running: - log.msg("Stopping SenderID service...") + self.log.debug("Stopping SenderID service...") self.service.stop() diff --git a/autopush/tests/test_endpoint.py b/autopush/tests/test_endpoint.py index 183ca9e0..af0b305c 100644 --- a/autopush/tests/test_endpoint.py +++ b/autopush/tests/test_endpoint.py @@ -1,4 +1,3 @@ -import functools import json import sys import time @@ -65,24 +64,6 @@ def write(self, data): self.file.write(data) -def patch_logger(test): - """Replaces the Twisted error logger with a mock implementation. - - This uses Trial's ``patch()`` method instead of Mock's ``@patch`` - decorator. The latter still causes the test to print a stack trace - and fail unless ``flushLoggedErrors()`` is called. - - """ - @functools.wraps(test) - def wrapper(self, *args, **kwargs): - log_mock = Mock() - self.patch(endpoint, 'log', log_mock) - self.patch(utils, 'log', log_mock) - params = args + (log_mock,) - return test(self, *params, **kwargs) - return wrapper - - class MessageTestCase(unittest.TestCase): def setUp(self): twisted.internet.base.DelayedCall.debug = True @@ -185,6 +166,7 @@ class EndpointTestCase(unittest.TestCase): @patch('uuid.uuid4', return_value=uuid.UUID(dummy_request_id)) def setUp(self, t): + from twisted.logger import Logger # this timeout *should* be set to 0.5, however Travis runs # so slow, that many of these tests will time out leading # to false failure rates and integration tests generally @@ -220,6 +202,7 @@ def setUp(self, t): self.wp_router_mock = settings.routers["webpush"] self.status_mock = self.endpoint.set_status = Mock() self.write_mock = self.endpoint.write = Mock() + self.endpoint.log = Mock(spec=Logger) d = self.finish_deferred = Deferred() self.endpoint.finish = lambda: d.callback(True) @@ -517,15 +500,14 @@ def handle_finish(result): self.endpoint.put(None, '') return self.finish_deferred - @patch_logger - def test_put_token_error(self, log_mock): + def test_put_token_error(self): self.fernet_mock.configure_mock(**{ 'decrypt.side_effect': TypeError}) self.endpoint.request.body = b'version=123' def handle_finish(value): self.fernet_mock.decrypt.assert_called_with(b'') - eq_(log_mock.err.called, True) + eq_(self.endpoint.log.failure.called, True) self._assert_error_response(value) self.finish_deferred.addCallback(handle_finish) @@ -1054,9 +1036,6 @@ def handle_finish(result): return self.finish_deferred def test_post_webpush_with_logged_delivered(self): - import autopush.endpoint - log_patcher = patch.object(autopush.endpoint.log, "msg", spec=True) - mock_log = log_patcher.start() self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() self.request_mock.headers["encryption"] = "stuff" @@ -1076,18 +1055,14 @@ def handle_finish(result): self.endpoint.set_status.assert_called_with(201) self.endpoint.set_header.assert_called_with( "Location", "Somewhere") - args, kwargs = mock_log.call_args + args, kwargs = self.endpoint.log.info.call_args eq_("Successful delivery", args[0]) - log_patcher.stop() self.finish_deferred.addCallback(handle_finish) self.endpoint.post(None, dummy_uaid) return self.finish_deferred def test_post_webpush_with_logged_stored(self): - import autopush.endpoint - log_patcher = patch.object(autopush.endpoint.log, "msg", spec=True) - mock_log = log_patcher.start() self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() self.request_mock.headers["encryption"] = "stuff" @@ -1107,16 +1082,14 @@ def handle_finish(result): self.endpoint.set_status.assert_called_with(201) self.endpoint.set_header.assert_called_with( "Location", "Somewhere") - args, kwargs = mock_log.call_args + args, kwargs = self.endpoint.log.info.call_args eq_("Router miss, message stored.", args[0]) - log_patcher.stop() self.finish_deferred.addCallback(handle_finish) self.endpoint.post(None, dummy_uaid) return self.finish_deferred - @patch("twisted.python.log") - def test_post_db_error_in_routing(self, mock_log): + def test_post_db_error_in_routing(self): from autopush.router.interface import RouterException self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() @@ -1207,8 +1180,7 @@ def test_cors_options(self): eq_(endpoint._headers[ch3], self.CORS_HEADERS) eq_(endpoint._headers[ch4], self.CORS_RESPONSE_HEADERS) - @patch_logger - def test_write_error(self, log_mock): + def test_write_error(self): """ Write error is triggered by sending the app a request with an invalid method (e.g. "put" instead of "PUT"). This is not code that is triggered within normal flow, but @@ -1224,10 +1196,9 @@ class testX(Exception): self.endpoint.write_error(999, exc_info=exc_info) self.status_mock.assert_called_with(999) - self.assertTrue(log_mock.err.called) + self.assertTrue(self.endpoint.log.called) - @patch_logger - def test_write_error_no_exc(self, log_mock): + def test_write_error_no_exc(self): """ Write error is triggered by sending the app a request with an invalid method (e.g. "put" instead of "PUT"). This is not code that is triggered within normal flow, but @@ -1235,7 +1206,7 @@ def test_write_error_no_exc(self, log_mock): """ self.endpoint.write_error(999) self.status_mock.assert_called_with(999) - self.assertTrue(log_mock.err.called) + self.assertTrue(self.endpoint.log.called) def _assert_error_response(self, result): self.status_mock.assert_called_with(500) diff --git a/autopush/tests/test_health.py b/autopush/tests/test_health.py index a90df3ec..8d683412 100644 --- a/autopush/tests/test_health.py +++ b/autopush/tests/test_health.py @@ -4,7 +4,7 @@ InternalServerError, ) from cyclone.web import Application -from mock import (Mock, patch) +from mock import Mock from moto import mock_dynamodb2 from twisted.internet.defer import Deferred from twisted.trial import unittest @@ -20,10 +20,10 @@ class HealthTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger self.timeout = 0.5 twisted.internet.base.DelayedCall.debug = True - self.log_mock = patch("autopush.health.log").start() self.mock_dynamodb2 = mock_dynamodb2() self.mock_dynamodb2.start() @@ -36,6 +36,7 @@ def setUp(self): self.request_mock = Mock() self.health = HealthHandler(Application(), self.request_mock) + self.health.log = self.log_mock = Mock(spec=Logger) self.status_mock = self.health.set_status = Mock() self.write_mock = self.health.write = Mock() @@ -44,7 +45,6 @@ def setUp(self): def tearDown(self): self.mock_dynamodb2.stop() - self.log_mock.stop() def test_healthy(self): return self._assert_reply({ diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index 95cb62b0..00537223 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -290,7 +290,7 @@ def ack(self, channel, version): def disconnect(self): self.ws.close() - def sleep(self, duration): + def sleep(self, duration): # pragma: nocover time.sleep(duration) def wait_for(self, func): diff --git a/autopush/tests/test_logging.py b/autopush/tests/test_logging.py index 27a29e22..85eb5945 100644 --- a/autopush/tests/test_logging.py +++ b/autopush/tests/test_logging.py @@ -7,9 +7,12 @@ from nose.tools import eq_, ok_ from twisted.internet import reactor from twisted.internet.defer import Deferred -from twisted.python import log, failure +from twisted.logger import Logger +from twisted.python import failure -from autopush.logging import setup_logging +from autopush.logging import PushLogger + +log = Logger() class SentryLogTestCase(twisted.trial.unittest.TestCase): @@ -25,10 +28,10 @@ def tearDown(self): def test_sentry_logging(self): os.environ["SENTRY_DSN"] = "some_locale" - setup_logging("Autopush") + PushLogger.setup_logging("Autopush", sentry_dsn=True) eq_(len(self.mock_raven.mock_calls), 2) - log.err(failure.Failure(Exception("eek"))) + log.failure("error", failure.Failure(Exception("eek"))) self.flushLoggedErrors() d = Deferred() @@ -43,21 +46,38 @@ def check(): return d -class EliotLogTestCase(twisted.trial.unittest.TestCase): +class PushLoggerTestCase(twisted.trial.unittest.TestCase): def test_custom_type(self): - setup_logging("Autopush") - with patch("sys.stdout") as mock_stdout: - log.msg("omg!", Type=7) - eq_(len(mock_stdout.mock_calls), 1) - kwargs = mock_stdout.mock_calls[0][1][0] + obj = PushLogger.setup_logging("Autopush") + obj._output = mock_stdout = Mock() + log.info("omg!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + kwargs = mock_stdout.mock_calls[0][1][0] ok_("Type" in kwargs) def test_human_logs(self): - setup_logging("Autopush", True) - with patch("sys.stdout") as mock_stdout: - mock_stdout.reset_mock() - log.msg("omg!", Type=7) - eq_(len(mock_stdout.mock_calls), 4) - mock_stdout.reset_mock() - log.err("wtf!", Type=7) - eq_(len(mock_stdout.mock_calls), 4) + obj = PushLogger.setup_logging("Autopush", log_format="text") + obj._output = mock_stdout = Mock() + log.info("omg!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + mock_stdout.reset_mock() + log.error("wtf!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + + def test_start_stop(self): + obj = PushLogger.setup_logging("Autopush") + obj.start() + obj.stop() + + def test_file_output(self): + try: + os.unlink("testfile.txt") + except: # pragma: nocover + pass + obj = PushLogger.setup_logging("Autoput", log_output="testfile.txt") + obj.start() + log.info("wow") + obj.stop() + with open("testfile.txt") as f: + lines = f.readlines() + eq_(len(lines), 1) diff --git a/autopush/tests/test_router.py b/autopush/tests/test_router.py index 29e2dd1f..3c14c594 100644 --- a/autopush/tests/test_router.py +++ b/autopush/tests/test_router.py @@ -8,7 +8,6 @@ from nose.tools import eq_, ok_ from twisted.trial import unittest from twisted.internet.error import ConnectError, ConnectionRefusedError -from twisted.python import log import apns import gcmclient @@ -82,6 +81,7 @@ def init(self, settings, router_conf): class APNSRouterTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger settings = AutopushSettings( hostname="localhost", statsd_host=None, @@ -90,6 +90,7 @@ def setUp(self): self.mock_apns = Mock(spec=apns.APNs) self.router = APNSRouter(settings, apns_config) self.router.apns = self.mock_apns + self.router.log = Mock(spec=Logger) self.notif = Notification(10, "data", dummy_chid, None, 200) self.router_data = dict(router_data=dict(token="connect_data")) @@ -430,12 +431,14 @@ def test_ammend(self): class SimplePushRouterTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger settings = AutopushSettings( hostname="localhost", statsd_host=None, ) self.router = SimpleRouter(settings, {}) + self.router.log = Mock(spec=Logger) self.notif = Notification(10, "data", dummy_chid, None, 200) mock_result = Mock(spec=gcmclient.gcm.Result) mock_result.canonical = dict() @@ -447,7 +450,6 @@ def setUp(self): self.agent_mock = Mock(spec=settings.agent) settings.agent = self.agent_mock self.router.metrics = Mock() - self.errors = [] def tearDown(self): dead_cache.clear() @@ -480,27 +482,17 @@ def verify_deliver(result): d.addBoth(verify_deliver) return d - def _mockObserver(self, event): - self.errors.append(event) - - def _contains_err(self, string): - for err in self.errors: - if string in err['message'][0]: - return True - return False # pragma: nocover - def test_route_connection_fail_saved(self): self.agent_mock.request.side_effect = MockAssist( [self._raise_connection_refused_error]) router_data = dict(node_id="http://somewhere", uaid=dummy_uaid) self.router_mock.clear_node.return_value = None self.router_mock.get_uaid.return_value = {} - log.addObserver(self._mockObserver) self.storage_mock.save_notification.return_value = True d = self.router.route_notification(self.notif, router_data) def verify_deliver(reply): - ok_(self._contains_err('ConnectionRefusedError')) + eq_(len(self.router.log.debug.mock_calls), 1) ok_(reply.status_code, 202) eq_(len(self.router_mock.clear_node.mock_calls), 1) diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index 7170bb3d..248e4839 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -47,8 +47,10 @@ def tearDown(): class WebsocketTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger twisted.internet.base.DelayedCall.debug = True self.proto = PushServerProtocol() + self.proto.log = Mock(spec=Logger) settings = AutopushSettings( hostname="localhost", @@ -104,7 +106,6 @@ def handle_message(result): return d def test_exc_catcher(self): - self.proto.log_err = Mock() req = Mock() def raise_error(*args, **kwargs): @@ -977,8 +978,6 @@ def test_unregister_with_webpush(self): assert self.proto.force_retry.called def test_ws_unregister(self): - patcher = patch("autopush.websocket.log", spec=True) - mock_log = patcher.start() self._connect() self._send_message(dict(messageType="hello", channelIDs=[])) @@ -989,8 +988,7 @@ def test_ws_unregister(self): def check_unregister_result(msg): eq_(msg["status"], 200) eq_(msg["channelID"], chid) - eq_(len(mock_log.mock_calls), 1) - patcher.stop() + eq_(len(self.proto.log.mock_calls), 1) d.callback(True) def check_hello_result(msg): @@ -1040,14 +1038,11 @@ def check_unregister_result(msg): return d def test_ws_unregister_fail(self): - patcher = patch('autopush.websocket.log', spec=True) - mock_log = patcher.start() self._connect() self.proto.ps.uaid = str(uuid.uuid4()) chid = str(uuid.uuid4()) d = Deferred() - d.addBoth(lambda x: patcher.stop()) # Replace storage delete with call to fail table = self.proto.ap_settings.storage.table @@ -1063,9 +1058,9 @@ def raise_exception(*args, **kwargs): channelID=chid)) def wait_for_times(): # pragma: nocover - if len(mock_log.mock_calls) > 0: + if len(self.proto.log.failure.mock_calls) > 0: try: - eq_(len(mock_log.mock_calls), 1) + eq_(len(self.proto.log.failure.mock_calls), 1) finally: d.callback(True) return @@ -1133,14 +1128,11 @@ def test_notification_avoid_newer_delivery(self): eq_(args, None) def test_ack(self): - patcher = patch('autopush.websocket.log', spec=True) - mock_log = patcher.start() self._connect() self._send_message(dict(messageType="hello", channelIDs=[])) d = Deferred() chid = str(uuid.uuid4()) - d.addBoth(lambda x: patcher.stop()) # stick a notification to ack in self.proto.ps.direct_updates[chid] = 12 @@ -1155,8 +1147,8 @@ def check_hello_result(msg): # Verify it was cleared out eq_(len(self.proto.ps.direct_updates), 0) - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "simplepush") eq_(kwargs["message_source"], "direct") @@ -1171,8 +1163,7 @@ def test_ack_with_bad_input(self): self._connect() eq_(self.proto.ack_update(None), None) - @patch('autopush.websocket.log', spec=True) - def test_ack_with_webpush_direct(self, mock_log): + def test_ack_with_webpush_direct(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) chid = str(uuid.uuid4()) @@ -1188,14 +1179,13 @@ def test_ack_with_webpush_direct(self, mock_log): version="bleh:asdjfilajsdilfj" )) eq_(self.proto.ps.direct_updates[chid], []) - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "webpush") eq_(kwargs["message_source"], "direct") - @patch('autopush.websocket.log', spec=True) - def test_ack_with_webpush_from_storage(self, mock_log): + def test_ack_with_webpush_from_storage(self): self._connect() chid = str(uuid.uuid4()) self.proto.ps.use_webpush = True @@ -1214,14 +1204,13 @@ def test_ack_with_webpush_from_storage(self, mock_log): )) assert self.proto.force_retry.called assert mock_defer.addBoth.called - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "webpush") eq_(kwargs["message_source"], "stored") - @patch('autopush.websocket.log', spec=True) - def test_nack(self, mock_log): + def test_nack(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) self.proto.onMessage(json.dumps(dict( @@ -1229,17 +1218,16 @@ def test_nack(self, mock_log): version="bleh:asdfhjklhjkl", code=200 )), False) - eq_(len(mock_log.mock_calls), 1) + eq_(len(self.proto.log.info.mock_calls), 1) - @patch('autopush.websocket.log', spec=True) - def test_nack_no_version(self, mock_log): + def test_nack_no_version(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) self.proto.onMessage(json.dumps(dict( messageType="nack", code=200 )), False) - eq_(len(mock_log.mock_calls), 0) + eq_(len(self.proto.log.info.mock_calls), 0) def test_ack_remove(self): self._connect() @@ -1382,14 +1370,13 @@ def throw_error(*args, **kwargs): self.proto.ap_settings.storage = Mock( **{"fetch_notifications.side_effect": throw_error}) self.proto.ps._check_notifications = True - self.proto.log_err = Mock() self.proto.process_notifications() d = Deferred() def check_error(result): eq_(self.proto.ps._check_notifications, False) - ok_(self.proto.log_err.called) + ok_(self.proto.log.failure.called) d.callback(True) self.proto.ps._notification_fetch.addBoth(check_error) diff --git a/autopush/utils.py b/autopush/utils.py index 2f03597f..83403c09 100644 --- a/autopush/utils.py +++ b/autopush/utils.py @@ -8,7 +8,8 @@ import ecdsa from jose import jws -from twisted.python import failure, log +from twisted.logger import Logger +from twisted.python import failure default_ports = { @@ -102,7 +103,8 @@ def extract_jwt(token, crypto_key): return jws.verify(token, vk, algorithms=["ES256"]) -class ErrorLogger (object): +class ErrorLogger(object): + log = Logger() def write_error(self, code, **kwargs): """Write the error (otherwise unhandled exception when dealing with @@ -113,8 +115,9 @@ def write_error(self, code, **kwargs): """ self.set_status(code) if "exc_info" in kwargs: - log.err(failure.Failure(*kwargs["exc_info"]), - **self._client_info) + self.log.failure(failure.Failure(*kwargs["exc_info"]), + **self._client_info) else: - log.err("Error in handler: %s" % code, **self._client_info) + self.log.failure("Error in handler: %s" % code, + **self._client_info) self.finish() diff --git a/autopush/websocket.py b/autopush/websocket.py index 00fde0b1..62016d72 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -52,8 +52,9 @@ ) from twisted.internet.interfaces import IProducer from twisted.internet.threads import deferToThread +from twisted.logger import Logger from twisted.protocols import policies -from twisted.python import failure, log +from twisted.python import failure from zope.interface import implements from twisted.web.resource import Resource @@ -103,7 +104,7 @@ def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception: - self.log_err(failure.Failure()) + self.log_failure(failure.Failure()) return wrapper @@ -226,6 +227,7 @@ def stopProducing(self): class PushServerProtocol(WebSocketServerProtocol, policies.TimeoutMixin): """Main Websocket Connection Protocol""" + log = Logger() # Testing purposes parent_class = WebSocketServerProtocol @@ -280,7 +282,7 @@ def force_retry(self, func, *args, **kwargs): def wrapper(result, *w_args, **w_kwargs): if isinstance(result, failure.Failure): # This is an exception, log it - self.log_err(result) + self.log_failure(result) d = deferToThread(func, *args, **kwargs) d.addErrback(wrapper) @@ -295,9 +297,9 @@ def base_tags(self): bug""" return self.ps._base_tags if self.ps._base_tags else None - def log_err(self, failure, **kwargs): - """Log a twisted failure out through twisted's log.err""" - log.err(failure, **kwargs) + def log_failure(self, failure, **kwargs): + """Log a twisted failure out through twisted's log.failure""" + self.log.failure("Unexpected error", failure, **kwargs) @property def paused(self): @@ -499,7 +501,7 @@ def _save_webpush_notif(self, notif): message_id=notif.version, ttl=notif.ttl, timestamp=notif.timestamp, - ).addErrback(self.log_err) + ).addErrback(self.log_failure) def _save_simple_notif(self, channel_id, version): """Save a simplepush notification""" @@ -508,7 +510,7 @@ def _save_simple_notif(self, channel_id, version): uaid=self.ps.uaid, chid=channel_id, version=version, - ).addErrback(self.log_err) + ).addErrback(self.log_failure) def _lookup_node(self, results): """Looks up the node to send a notify for it to check storage if @@ -519,7 +521,8 @@ def _lookup_node(self, results): self.ps.uaid ) d.addCallback(self._notify_node) - d.addErrback(self.log_err, extra="Failed to get UAID for redeliver") + d.addErrback(self.log_failure, + extra="Failed to get UAID for redeliver") def _notify_node(self, result): """Checks the result of lookup node to send the notify if the client is @@ -543,7 +546,7 @@ def _notify_node(self, result): "PUT", url.encode("utf8"), ).addCallback(IgnoreBody.ignore) - d.addErrback(self.log_err, extra="Failed to notify node") + d.addErrback(self.log_failure, extra="Failed to notify node") def returnError(self, messageType, reason, statusCode, close=True): """Return an error to a client, and optionally shut down the connection @@ -657,7 +660,7 @@ def _register_user(self, existing_user=True): def err_hello(self, failure): """errBack for hello failures""" self.transport.resumeProducing() - self.log_err(failure) + self.log_failure(failure) self.returnError("hello", "error", 503) def _copy_new_data(self, result): @@ -732,7 +735,7 @@ def _check_other_nodes(self, result): ResponseNeverReceived, ConnectionLost, ConnectionDone)) - d.addErrback(self.log_err, + d.addErrback(self.log_failure, extra="Failed to delete old node") # UDP clients are done at this point and timed out to ensure they @@ -835,7 +838,7 @@ def process_notifications(self): def error_notifications(self, fail): """errBack for notification check failing""" # If we error'd out on this important check, we drop the connection - self.log_err(fail) + self.log_failure(fail) self.sendClose() def finish_notifications(self, notifs): @@ -933,7 +936,7 @@ def _rotate_message_table(self): d.addCallback(self._register_rotated_channels) d.addErrback(self.trap_cancel) d.addErrback(self.err_overload, "notif") - d.addErrback(self.log_err) + d.addErrback(self.log_failure) def _register_rotated_channels(self, result): """Register the channels into a new entry in the current month""" @@ -1052,8 +1055,9 @@ def process_unregister(self, data): # Log out the unregister if it has a code in it if "code" in data: code = extract_code(data) - log.msg("Unregister", channelID=chid, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Unregister", channelID=chid, + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) # Clear out any existing tracked messages for this channel if self.ps.use_webpush: @@ -1112,10 +1116,10 @@ def ver_filter(update): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - log.msg("Ack", router_key="webpush", channelID=chid, - message_id=version, message_source="direct", - message_size=size, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="webpush", channelID=chid, + message_id=version, message_source="direct", + message_size=size, uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) self.ps.direct_updates[chid].remove(msg) return @@ -1123,10 +1127,10 @@ def ver_filter(update): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - log.msg("Ack", router_key="webpush", channelID=chid, - message_id=version, message_source="stored", - message_size=size, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="webpush", channelID=chid, + message_id=version, message_source="stored", + message_size=size, uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) d = self.force_retry(self.ps.message.delete_message, uaid=self.ps.uaid, channel_id=chid, @@ -1156,15 +1160,15 @@ def _handle_simple_ack(self, chid, version, code): if chid in self.ps.direct_updates and \ self.ps.direct_updates[chid] <= version: del self.ps.direct_updates[chid] - log.msg("Ack", router_key="simplepush", channelID=chid, - message_id=version, message_source="direct", - uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="simplepush", channelID=chid, + message_id=version, message_source="direct", + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) return - log.msg("Ack", router_key="simplepush", channelID=chid, - message_id=version, message_source="stored", - uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="simplepush", channelID=chid, + message_id=version, message_source="stored", + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) if chid in self.ps.updates_sent and \ self.ps.updates_sent[chid] <= version: del self.ps.updates_sent[chid] @@ -1199,9 +1203,9 @@ def process_nack(self, data): version, updateid = version.split(":") - log.msg("Nack", uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, message_id=version, - code=code) + self.log.info("Nack", uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, message_id=version, + code=code) def check_missed_notifications(self, results, resume=False): """Check to see if notifications were missed""" diff --git a/doc-requirements.txt b/doc-requirements.txt index f13824c4..a6351832 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -17,7 +17,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.7.1 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4 diff --git a/requirements.txt b/requirements.txt index 9cbc7d0a..55e50a50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.11.0 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4 diff --git a/test-requirements.txt b/test-requirements.txt index 0fb0ebb2..e3cb38a9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,7 +21,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.11.0 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4