diff --git a/.travis.yml b/.travis.yml index 24336d53..9f24f86f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,10 @@ language: python sudo: false python: - - "pypy" + - "2.7" install: - - wget https://bitbucket.org/pypy/pypy/downloads/pypy-5.0.0-linux64.tar.bz2 - - tar xjvf pypy-5.0.0-linux64.tar.bz2 - - virtualenv -p pypy-5.0.0-linux64/bin/pypy pypy - - source pypy/bin/activate + - virtualenv pypy + - ln -s pypy/bin/python pypy/bin/pypy - make travis script: tox -- --with-coverage --cover-xml --cover-package=autopush after_success: diff --git a/autopush/db.py b/autopush/db.py index 6c45f650..ba8c2fc4 100644 --- a/autopush/db.py +++ b/autopush/db.py @@ -2,7 +2,6 @@ from __future__ import absolute_import import datetime -import logging import random import time import uuid @@ -21,9 +20,6 @@ from autopush.utils import generate_hash - -log = logging.getLogger(__file__) - key_hash = "" TRACK_DB_CALLS = False DB_CALLS = [] @@ -53,8 +49,6 @@ def hasher(uaid): def normalize_id(id): - if not id: - return id if (len(id) == 36 and id[8] == id[13] == id[18] == id[23] == '-'): return id.lower() @@ -434,7 +428,6 @@ def update_message(self, uaid, channel_id, message_id, ttl, data=None, else: expr += " REMOVE #dd, headers" expr_values = self.encode({":%s" % k: v for k, v in item.items()}) - log.debug(expr_values) conn.update_item( self.table.table_name, db_key, diff --git a/autopush/endpoint.py b/autopush/endpoint.py index 0677d95e..a3915479 100644 --- a/autopush/endpoint.py +++ b/autopush/endpoint.py @@ -38,7 +38,6 @@ from cryptography.fernet import InvalidToken from twisted.internet.defer import Deferred from twisted.internet.threads import deferToThread -from twisted.python import log from autopush.db import ( generate_last_connect, @@ -220,13 +219,13 @@ def _response_err(self, fail): running. """ - log.err(fail, **self._client_info) + self.log.failure(fail, **self._client_info) self._write_response(500, 999) def _overload_err(self, fail): """errBack for throughput provisioned exceptions""" fail.trap(ProvisionedThroughputExceededException) - log.msg("Throughput Exceeded", **self._client_info) + self.log.info("Throughput Exceeded", **self._client_info) self._write_response(503, 201) def _router_response(self, response): @@ -248,40 +247,40 @@ def _router_fail_err(self, fail): fail.trap(RouterException) exc = fail.value if exc.log_exception: - log.err(fail, **self._client_info) # pragma nocover + self.log.failure(fail, **self._client_info) # pragma nocover if 200 <= exc.status_code < 300: - log.msg("Success", status_code=exc.status_code, - logged_status=exc.logged_status or "", - **self._client_info) + self.log.info("Success", status_code=exc.status_code, + logged_status=exc.logged_status or "", + **self._client_info) elif 400 <= exc.status_code < 500: - log.msg("Client error", status_code=exc.status_code, - logged_status=exc.logged_status or "", - errno=exc.errno or "", - **self._client_info) + self.log.info("Client error", status_code=exc.status_code, + logged_status=exc.logged_status or "", + errno=exc.errno or "", + **self._client_info) self._router_response(exc) def _uaid_not_found_err(self, fail): """errBack for uaid lookup not finding the user""" fail.trap(ItemNotFound) - log.msg("UAID not found in AWS.", **self._client_info) + self.log.debug("UAID not found in AWS.", **self._client_info) self._write_response(410, 103) def _token_err(self, fail): """errBack for token decryption fail""" fail.trap(InvalidToken, InvalidTokenException) - log.msg("Invalid token", **self._client_info) + self.log.debug("Invalid token", **self._client_info) self._write_response(400, 102) def _auth_err(self, fail): """errBack for invalid auth token""" fail.trap(VapidAuthException) - log.msg("Invalid Auth token", **self._client_info) + self.log.debug("Invalid Auth token", **self._client_info) self._write_unauthorized_response(message=fail.value.message) def _chid_not_found_err(self, fail): """errBack for unknown chid""" fail.trap(ItemNotFound, ValueError) - log.msg("CHID not found in AWS.", **self._client_info) + self.log.debug("CHID not found in AWS.", **self._client_info) self._write_response(410, 106) ############################################################# @@ -307,7 +306,7 @@ def _invalid_auth(self, fail): message = fail.value.message or repr(fail.value) if isinstance(fail.value, AssertionError): message = "A decryption error occurred" - log.msg("Invalid bearer token: " + message, **self._client_info) + self.log.debug("Invalid bearer token: " + message, **self._client_info) raise VapidAuthException("Invalid bearer token: " + message) def _process_auth(self, result): @@ -380,7 +379,7 @@ def _delete_message(self, kind, uaid, chid): return d def _delete_completed(self, response): - log.msg("Message Deleted", status_code=204, **self._client_info) + self.log.info("Message Deleted", status_code=204, **self._client_info) self.set_status(204) self.finish() @@ -439,8 +438,8 @@ def _uaid_lookup_results(self, result): try: self.router = self.ap_settings.routers[router_key] except KeyError: - log.msg("Invalid router requested", status_code=400, errno=108, - **self._client_info) + self.log.debug("Invalid router requested", status_code=400, + errno=108, **self._client_info) return self._write_response(400, 108, message="Invalid router") @@ -457,13 +456,13 @@ def _uaid_lookup_results(self, result): req_fields = ["content-encoding", "encryption"] if data and not all([x in self.request.headers for x in req_fields]): - log.msg("Client error", status_code=400, errno=101, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=101, + **self._client_info) return self._write_response(400, 101) if ("encryption-key" in self.request.headers and "crypto-key" in self.request.headers): - log.msg("Client error", status_code=400, errno=110, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=110, + **self._client_info) return self._write_response( 400, 110, message="Invalid crypto headers") self._client_info["message_size"] = len(data) if data else 0 @@ -478,13 +477,13 @@ def _uaid_lookup_results(self, result): # Cap the TTL to our MAX_TTL ttl = min(ttl, MAX_TTL) else: - log.msg("Client error", status_code=400, - errno=112, **self._client_info) + self.log.debug("Client error", status_code=400, + errno=112, **self._client_info) return self._write_response(400, 112, message="Invalid TTL header") if data and len(data) > self.ap_settings.max_data: - log.msg("Client error", status_code=400, errno=104, - **self._client_info) + self.log.debug("Client error", status_code=400, errno=104, + **self._client_info) return self._write_response( 413, 104, message="Data payload too large") @@ -538,9 +537,10 @@ def _router_completed(self, response, uaid_data): return d else: if response.status_code == 200 or response.logged_status == 200: - log.msg("Successful delivery", **self._client_info) + self.log.info("Successful delivery", **self._client_info) elif response.status_code == 202 or response.logged_status == 202: - log.msg("Router miss, message stored.", **self._client_info) + self.log.info("Router miss, message stored.", + **self._client_info) time_diff = time.time() - self.start_time self.metrics.timing("updates.handled", duration=time_diff) self._router_response(response) @@ -592,7 +592,7 @@ def post(self, router_type="", router_token="", uaid="", chid=""): # normalize the path vars into parameters if router_type not in self.ap_settings.routers: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -645,7 +645,7 @@ def put(self, router_type="", router_token="", uaid="", chid=""): self.uaid = uaid router_data = params if router_type not in self.ap_settings.routers or not router_data: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -693,7 +693,7 @@ def delete(self, router_type="", router_token="", uaid="", chid=""): return self._write_unauthorized_response( message="Invalid Authentication") if router_type not in self.ap_settings.routers: - log.msg("Invalid router requested", **self._client_info) + self.log.debug("Invalid router requested", **self._client_info) return self._write_response( 400, 108, message="Invalid router") router = self.ap_settings.routers[router_type] @@ -764,7 +764,7 @@ def _return_endpoint(self, endpoint_data, new_uaid, router=None): else: msg = dict(channelID=self.chid, endpoint=endpoint_data[0]) self.write(json.dumps(msg)) - log.msg("Endpoint registered via HTTP", **self._client_info) + self.log.debug("Endpoint registered via HTTP", **self._client_info) self.finish() def _success(self, result): diff --git a/autopush/health.py b/autopush/health.py index 7d5dd238..00a88f4a 100644 --- a/autopush/health.py +++ b/autopush/health.py @@ -6,7 +6,7 @@ ) from twisted.internet.defer import DeferredList from twisted.internet.threads import deferToThread -from twisted.python import log +from twisted.logger import Logger from autopush import __version__ @@ -18,6 +18,8 @@ class MissingTableException(Exception): class HealthHandler(cyclone.web.RequestHandler): """HTTP Health Handler""" + log = Logger() + @cyclone.web.asynchronous def get(self): """HTTP Get @@ -54,7 +56,7 @@ def _check_success(self, result, name): def _check_error(self, failure, name): """Returns an error, and why""" self._healthy = False - log.err(failure, name) + self.log.failure(failure, name) cause = self._health_checks[name] = {"status": "NOT OK"} if failure.check(InternalServerError): diff --git a/autopush/logging.py b/autopush/logging.py index e1f2d7ba..b7935d45 100644 --- a/autopush/logging.py +++ b/autopush/logging.py @@ -1,120 +1,130 @@ """Custom Logging Setup - -This module sets up eliot structured logging, intercepts stdout output from -twisted, and pipes it through eliot for later processing into Kibana per -Mozilla Services standard structured logging. - """ -# TWISTED_LOG_MESSAGE and EliotObserver licensed under APL 2.0 from -# ClusterHQ/flocker -# https://github.com/ClusterHQ/flocker/blob/master/flocker/common/script.py#L81-L106 +import io import json -import os import pkg_resources import socket import sys import raven -from eliot import (add_destination, fields, - Logger, MessageType) from twisted.internet import reactor -from twisted.python.log import textFromEventDict, startLoggingWithObserver +from twisted.logger import ( + formatEvent, + formatEventAsClassicLogText, + globalLogPublisher, + LogLevel, + ILogObserver +) +from zope.interface import implementer HOSTNAME = socket.getfqdn() -LOGGER = None -TWISTED_LOG_MESSAGE = MessageType("twisted:log", - fields(error=bool, message=unicode), - u"A log message from Twisted.") -HUMAN = False - -class EliotObserver(object): - """A Twisted log observer that logs to Eliot""" - def __init__(self): - """Create the Eliot Observer""" - if os.environ.get("SENTRY_DSN"): +# A complete set of keys we don't include in Fields from a log event +IGNORED_KEYS = frozenset([ + "factory", + "failure", + "format", + "isError", + "log_format", + "log_flattened", + "log_level", + "log_legacy", + "log_logger", + "log_source", + "log_system", + "log_text", + "log_time", + "log_trace", + "message", + "message_type", + "severity", + "task_level", + "time", + "timestamp", + "type", + "why", +]) + + +@implementer(ILogObserver) +class PushLogger(object): + def __init__(self, logger_name, log_level="debug", log_format="json", + log_output="stdout", sentry_dsn=None): + self.logger_name = "-".join([ + logger_name, + pkg_resources.get_distribution("autopush").version + ]) + self._filename = None + self.log_level = LogLevel.lookupByName(log_level) + if log_output == "stdout": + self._output = sys.stdout + else: + self._filename = log_output + self._output = "file" + if log_format == "json": + self.format_event = self.json_format + else: + self.format_event = formatEventAsClassicLogText + if sentry_dsn: self.raven_client = raven.Client( release=raven.fetch_package_version("autopush")) else: self.raven_client = None - self.logger = Logger() - - def raven_log(self, event): - """Log out twisted exception failures to Raven""" - f = event['failure'] - # Throw back to event loop - reactor.callFromThread( - self.raven_client.captureException, - (f.type, f.value, f.getTracebackObject()) - ) - def __call__(self, msg): - """Called to log out messages""" - error = bool(msg.get("isError")) + def __call__(self, event): + if event["log_level"] < self.log_level: + return - if self.raven_client and 'failure' in msg: - self.raven_log(msg) - error = True + if self.raven_client and 'failure' in event: + f = event["failure"] + reactor.callFromThread( + self.raven_client.captureException, + (f.type, f.value, f.getTracebackObject()) + ) - # Twisted log messages on Python 2 are bytes. We don't know the - # encoding, but assume it's ASCII superset. Charmap will translate - # ASCII correctly, and higher-bit characters just map to - # corresponding Unicode code points, and will never fail at decoding. - message = unicode(textFromEventDict(msg), "charmap") - kw = msg.copy() - for key in ["message", "isError", "failure", "why", "format"]: - kw.pop(key, None) - TWISTED_LOG_MESSAGE(error=error, message=message, **kw).write( - self.logger) + text = self.format_event(event) + self._output.write(unicode(text)) + self._output.flush() - def start(self): - """Start capturing Twisted logs.""" - startLoggingWithObserver(self, setStdout=False) + def json_format(self, event): + error = bool(event.get("isError")) or "failure" in event + ts = event["log_time"] - -def stdout(message): - """Format a message appropriately for structured logging capture of stdout - and then write it to stdout""" - if HUMAN: - if message['error']: - sys.stdout.write("ERROR: %s\n" % message['message']) + if error: + severity = 3 else: - sys.stdout.write(" %s\n" % message['message']) - return - msg = {} - ts = message.pop("timestamp") - message.pop("time") - del message["task_level"] - msg["Hostname"] = HOSTNAME - if message["error"]: - msg["Severity"] = 3 - else: - msg["Severity"] = 5 - - # 'message_type' is used by twisted logging. Preserve this to the - # 'Type' tag. - if "message_type" in message: - msg["Type"] = message.pop("message_type") - - for key in ["Severity", "type", "severity"]: - if key in message: - msg[key.title()] = message.pop(key) + severity = 5 + + msg = { + "Hostname": HOSTNAME, + "Timestamp": ts * 1000 * 1000 * 1000, + "Type": "twisted:log", + "Severity": event.get("severity") or severity, + "EnvVersion": "2.0", + "Fields": {k: v for k, v in event.iteritems() + if k not in IGNORED_KEYS and + type(v) in (str, unicode, list, int, float)}, + "Logger": self.logger_name, + } + # Add the nicely formatted message + msg["Fields"]["message"] = formatEvent(event) + return json.dumps(msg, skipkeys=True) + "\n" - msg["Timestamp"] = ts * 1000 * 1000 * 1000 - msg["Fields"] = {k: v for k, v in message.items() - if not k.startswith("log_")} - msg["EnvVersion"] = "2.0" - msg["Logger"] = LOGGER - sys.stdout.write(json.dumps(msg, skipkeys=True) + "\n") - - -def setup_logging(logger_name, human=False): - """Patch in the Eliot logger and twisted log interception""" - global LOGGER, HUMAN - LOGGER = "-".join([logger_name, - pkg_resources.get_distribution("autopush").version]) - HUMAN = human - add_destination(stdout) - ellie = EliotObserver() - ellie.start() - return ellie + def start(self): + if self._filename: + self._output = io.open(self._filename, "a", encoding="utf-8") + globalLogPublisher.addObserver(self) + + def stop(self): + globalLogPublisher.removeObserver(self) + if self._filename: + self._output.close() + self._output = None + + @classmethod + def setup_logging(cls, logger_name, log_level="info", log_format="json", + log_output="stdout", sentry_dsn=None): + pl = cls(logger_name, log_level=log_level, log_format=log_format, + log_output=log_output, sentry_dsn=sentry_dsn) + pl.start() + return pl diff --git a/autopush/main.py b/autopush/main.py index e35e423f..00a3f931 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -1,21 +1,23 @@ """autopush/autoendpoint daemon scripts""" +import json +import os + import configargparse import cyclone.web -import json -import autopush.db as db -from autobahn.twisted.websocket import WebSocketServerFactory from autobahn.twisted.resource import WebSocketResource +from autobahn.twisted.websocket import WebSocketServerFactory from twisted.internet import reactor, task -from twisted.python import log +from twisted.logger import Logger from twisted.web.server import Site +import autopush.db as db from autopush.endpoint import ( EndpointHandler, MessageHandler, RegistrationHandler, ) from autopush.health import (HealthHandler, StatusHandler) -from autopush.logging import setup_logging +from autopush.logging import PushLogger from autopush.settings import AutopushSettings from autopush.ssl import AutopushSSLContextFactory from autopush.websocket import ( @@ -34,6 +36,7 @@ '~/.autopush_shared.ini', '.autopush_shared.ini', ] +log = Logger() def add_shared_args(parser): @@ -43,6 +46,10 @@ def add_shared_args(parser): dest='config_file', is_config_file=True) parser.add_argument('--debug', help='Debug Info.', action="store_true", default=False, env_var="DEBUG") + parser.add_argument('--log_level', help='Log level to log', type=str, + default="", env_var="LOG_LEVEL") + parser.add_argument('--log_output', help="Log output, stdout or filename", + default="stdout", env_var="LOG_OUTPUT") parser.add_argument('--crypto_key', help="Crypto key for tokens", default=[], env_var="CRYPTO_KEY", type=str, action="append") @@ -124,7 +131,6 @@ def obsolete_args(parser): These are included to prevent startup errors with old config files. """ - parser.add_argument('--log_level', help="OBSOLETE", type=int) parser.add_argument('--external_router', action="store_true", help='OBSOLETE') parser.add_argument('--max_message_size', type=int, help="OBSOLETE") @@ -297,7 +303,7 @@ def make_settings(args, **kwargs): # unaccounted. senderID = senderIDs.choose_ID() if senderID is None: - log.err("No GCM SenderIDs specified or found.") + log.critical("No GCM SenderIDs specified or found.") return router_conf["gcm"] = {"ttl": args.gcm_ttl, "dryrun": args.gcm_dryrun, @@ -350,7 +356,12 @@ def mount_health_handlers(site, settings): def connection_main(sysargs=None, use_files=True): """Main entry point to setup a connection node, aka the autopush script""" args, parser = _parse_connection(sysargs, use_files) - setup_logging("Autopush", args.human_logs) + log_format = "text" if args.human_logs else "json" + log_level = args.log_level or ("debug" if args.debug else "info") + sentry_dsn = bool(os.environ.get("SENTRY_DSN")) + PushLogger.setup_logging("Autopush", log_level=log_level, + log_format=log_format, log_output=args.log_output, + sentry_dsn=sentry_dsn) settings = make_settings( args, port=args.port, @@ -445,11 +456,16 @@ def endpoint_main(sysargs=None, use_files=True): if args.senderid_list: try: senderid_list = json.loads(args.senderid_list) - except (ValueError, TypeError), x: - log.err("Invalid JSON specified for senderid_list.", x) + except (ValueError, TypeError): + log.critical("Invalid JSON specified for senderid_list") return - setup_logging("Autoendpoint", args.human_logs) + log_level = args.log_level or ("debug" if args.debug else "info") + log_format = "text" if args.human_logs else "json" + sentry_dsn = bool(os.environ.get("SENTRY_DSN")) + PushLogger.setup_logging("Autoendpoint", log_level=log_level, + log_format=log_format, log_output=args.log_output, + sentry_dsn=sentry_dsn) settings = make_settings( args, diff --git a/autopush/router/apnsrouter.py b/autopush/router/apnsrouter.py index 7c4b4c6a..d09027db 100644 --- a/autopush/router/apnsrouter.py +++ b/autopush/router/apnsrouter.py @@ -2,7 +2,7 @@ import time import apns -from twisted.python import log +from twisted.logger import Logger from twisted.internet.threads import deferToThread from autopush.router.interface import RouterException, RouterResponse @@ -11,6 +11,7 @@ # https://github.com/djacobs/PyAPNs class APNSRouter(object): """APNS Router Implementation""" + log = Logger() apns = None messages = {} errors = {0: 'No error', @@ -40,7 +41,7 @@ def __init__(self, ap_settings, router_conf): self.default_title = router_conf.get("default_title", "SimplePush") self.default_body = router_conf.get("default_body", "New Alert") self._connect() - log.msg("Starting APNS router...") + self.log.debug("Starting APNS router...") def register(self, uaid, router_data, *kwargs): """Validate that an APNs instance token is in the ``router_data``""" @@ -86,12 +87,13 @@ def _route(self, notification, router_data): def _error(self, err): """Error handler""" if err['status'] == 0: - log.msg("Success") + self.log.debug("Success") del self.messages[err['identifier']] return - log.err("APNs Error encountered: %s" % self.errors[err['status']]) + self.log.debug("APNs Error encountered: {status}", + status=self.errors[err['status']]) if err['status'] in [1, 255]: - log.msg("Retrying...") + self.log.debug("Retrying...") self._connect() resend = self.messages.get(err.get('identifier')) if resend is None: diff --git a/autopush/router/gcm.py b/autopush/router/gcm.py index 78bbe4ce..12111e97 100644 --- a/autopush/router/gcm.py +++ b/autopush/router/gcm.py @@ -3,8 +3,8 @@ import json from base64 import urlsafe_b64encode -from twisted.python import log from twisted.internet.threads import deferToThread +from twisted.logger import Logger from autopush.router.interface import RouterException, RouterResponse from autopush.senderids import SenderIDs @@ -12,6 +12,7 @@ class GCMRouter(object): """GCM Router Implementation""" + log = Logger() gcm = None dryRun = 0 collapseKey = "simplepush" @@ -30,7 +31,7 @@ def __init__(self, ap_settings, router_conf): self.gcm = gcmclient.GCM(senderID.get("auth")) except: raise IOError("GCM Bridge not initiated in main") - log.msg("Starting GCM router...") + self.log.debug("Starting GCM router...") def check_token(self, token): if token not in self.senderIDs.senderIDs(): @@ -123,7 +124,7 @@ def _route(self, notification, router_data): def _error(self, err, status, **kwargs): """Error handler that raises the RouterException""" - log.err(err, **kwargs) + self.log.debug(err, **kwargs) return RouterException(err, status_code=status, response_body=err, **kwargs) @@ -133,14 +134,15 @@ def _process_reply(self, reply): # for reg_id, msg_id in reply.success.items(): # updates for old_id, new_id in reply.canonical.items(): - log.msg("GCM id changed : %s => " % old_id, new_id) + self.log.debug("GCM id changed : {old} => {new}", + old=old_id, new=new_id) return RouterResponse(status_code=503, response_body="Please try request again.", router_data=dict(token=new_id)) # naks: # uninstall: for reg_id in reply.not_registered: - log.msg("GCM no longer registered: %s" % reg_id) + self.log.debug("GCM no longer registered: %s" % reg_id) return RouterResponse( status_code=410, response_body="Endpoint requires client update", @@ -149,7 +151,8 @@ def _process_reply(self, reply): # for reg_id, err_code in reply.failed.items(): if len(reply.failed.items()) > 0: - log.msg("GCM failures: %s" % json.dumps(reply.failed.items())) + self.log.debug("GCM failures: {failed()}", + failed=lambda: json.dumps(reply.failed.items())) raise RouterException("GCM failure to deliver", status_code=503, response_body="Please try request later.") diff --git a/autopush/router/simple.py b/autopush/router/simple.py index a0e6b8ce..a92b1a59 100644 --- a/autopush/router/simple.py +++ b/autopush/router/simple.py @@ -26,7 +26,7 @@ ConnectionRefusedError, UserError ) -from twisted.python import log +from twisted.logger import Logger from twisted.web.client import FileBodyProducer from autopush.protocol import IgnoreBody @@ -48,6 +48,8 @@ class SimpleRouter(object): """Implements :class:`autopush.router.interface.IRouter` for internal routing to an Autopush node """ + log = Logger() + def __init__(self, ap_settings, router_conf): """Create a new SimpleRouter""" self.ap_settings = ap_settings @@ -107,7 +109,9 @@ def route_notification(self, notification, uaid_data): yield deferToThread(router.clear_node, uaid_data).addErrback(self._eat_db_err) if isinstance(exc, ConnectionRefusedError): - log.err("Could not route message: %s" % repr(exc)) + # Occurs if an IP record is now used by some other node + # in AWS. + self.log.debug("Could not route message: {exc}", exc=exc) if result and result.code == 200: self.metrics.increment("router.broadcast.hit") returnValue(self.delivered_response(notification)) @@ -159,7 +163,7 @@ def route_notification(self, notification, uaid_data): self.metrics.increment("updates.client.host_gone") dead_cache.put(node_key(node_id), True) if isinstance(exc, ConnectionRefusedError): - log.err("Could not route message: %s" % repr(exc)) + self.log.debug("Could not route message: {exc}", exc=exc) yield deferToThread( router.clear_node, uaid_data).addErrback(self._eat_db_err) @@ -181,8 +185,9 @@ def route_notification(self, notification, uaid_data): data=urlencode(self.udp["data"]), cert=self.conf.get("cert"), timeout=self.conf.get("server_timeout", 3))) - except Exception, x: - log.err("Could not send UDP wake request:", str(x)) + except Exception as exc: + self.log.debug("Could not send UDP wake request: {exc}", + exc=exc) returnValue(retVal) ############################################################# diff --git a/autopush/senderids.py b/autopush/senderids.py index e2c4953b..8d094d79 100644 --- a/autopush/senderids.py +++ b/autopush/senderids.py @@ -30,9 +30,9 @@ from boto.s3.connection import S3Connection from boto.s3.key import Key from boto.exception import S3ResponseError -from twisted.python import log from twisted.internet.threads import deferToThread from twisted.internet.task import LoopingCall +from twisted.logger import Logger # re-read from source every 15 minutes or so. SENDERID_EXPRY = 15*60 @@ -41,6 +41,7 @@ class SenderIDs(object): """Handle Read, Write and cache of SenderID values from S3""" + log = Logger() _expry = SENDERID_EXPRY _senderIDs = {} @@ -58,14 +59,14 @@ def __init__(self, args): self.service = LoopingCall(self._refresh) if senderIDs: if type(senderIDs) is not dict: - log.err("senderid_list is not a dict. Ignoring") + self.log.critical("senderid_list is not a dict. Ignoring") else: # We're initializing, so it's ok to block. self.update(senderIDs) def start(self): if self._use_s3: - log.msg("Starting SenderID service...") + self.log.debug("Starting SenderID service...") self.service.start(self._expry) def _write(self, senderIDs, *args): @@ -93,8 +94,8 @@ def _update_senderIDs(self, *args): candidates = json.loads(key.get_contents_as_string()) if candidates: if type(candidates) is not dict: - log.err("Wrong data type stored for senderIDs. " - "Should be dict. Ignoring.") + self.log.critical("Wrong data type stored for senderIDs. " + "Should be dict. Ignoring.") return return candidates @@ -116,7 +117,7 @@ def update(self, senderIDs): if not senderIDs: return if type(senderIDs) is not dict: - log.err("Wrong data type for senderIDs. Should be dict.") + self.log.critical("Wrong data type for senderIDs. Should be dict.") return if not self._use_s3: # Skip using s3 (For debugging) @@ -148,5 +149,5 @@ def choose_ID(self): def stop(self): if self.service and self.service.running: - log.msg("Stopping SenderID service...") + self.log.debug("Stopping SenderID service...") self.service.stop() diff --git a/autopush/tests/test_endpoint.py b/autopush/tests/test_endpoint.py index 183ca9e0..af0b305c 100644 --- a/autopush/tests/test_endpoint.py +++ b/autopush/tests/test_endpoint.py @@ -1,4 +1,3 @@ -import functools import json import sys import time @@ -65,24 +64,6 @@ def write(self, data): self.file.write(data) -def patch_logger(test): - """Replaces the Twisted error logger with a mock implementation. - - This uses Trial's ``patch()`` method instead of Mock's ``@patch`` - decorator. The latter still causes the test to print a stack trace - and fail unless ``flushLoggedErrors()`` is called. - - """ - @functools.wraps(test) - def wrapper(self, *args, **kwargs): - log_mock = Mock() - self.patch(endpoint, 'log', log_mock) - self.patch(utils, 'log', log_mock) - params = args + (log_mock,) - return test(self, *params, **kwargs) - return wrapper - - class MessageTestCase(unittest.TestCase): def setUp(self): twisted.internet.base.DelayedCall.debug = True @@ -185,6 +166,7 @@ class EndpointTestCase(unittest.TestCase): @patch('uuid.uuid4', return_value=uuid.UUID(dummy_request_id)) def setUp(self, t): + from twisted.logger import Logger # this timeout *should* be set to 0.5, however Travis runs # so slow, that many of these tests will time out leading # to false failure rates and integration tests generally @@ -220,6 +202,7 @@ def setUp(self, t): self.wp_router_mock = settings.routers["webpush"] self.status_mock = self.endpoint.set_status = Mock() self.write_mock = self.endpoint.write = Mock() + self.endpoint.log = Mock(spec=Logger) d = self.finish_deferred = Deferred() self.endpoint.finish = lambda: d.callback(True) @@ -517,15 +500,14 @@ def handle_finish(result): self.endpoint.put(None, '') return self.finish_deferred - @patch_logger - def test_put_token_error(self, log_mock): + def test_put_token_error(self): self.fernet_mock.configure_mock(**{ 'decrypt.side_effect': TypeError}) self.endpoint.request.body = b'version=123' def handle_finish(value): self.fernet_mock.decrypt.assert_called_with(b'') - eq_(log_mock.err.called, True) + eq_(self.endpoint.log.failure.called, True) self._assert_error_response(value) self.finish_deferred.addCallback(handle_finish) @@ -1054,9 +1036,6 @@ def handle_finish(result): return self.finish_deferred def test_post_webpush_with_logged_delivered(self): - import autopush.endpoint - log_patcher = patch.object(autopush.endpoint.log, "msg", spec=True) - mock_log = log_patcher.start() self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() self.request_mock.headers["encryption"] = "stuff" @@ -1076,18 +1055,14 @@ def handle_finish(result): self.endpoint.set_status.assert_called_with(201) self.endpoint.set_header.assert_called_with( "Location", "Somewhere") - args, kwargs = mock_log.call_args + args, kwargs = self.endpoint.log.info.call_args eq_("Successful delivery", args[0]) - log_patcher.stop() self.finish_deferred.addCallback(handle_finish) self.endpoint.post(None, dummy_uaid) return self.finish_deferred def test_post_webpush_with_logged_stored(self): - import autopush.endpoint - log_patcher = patch.object(autopush.endpoint.log, "msg", spec=True) - mock_log = log_patcher.start() self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() self.request_mock.headers["encryption"] = "stuff" @@ -1107,16 +1082,14 @@ def handle_finish(result): self.endpoint.set_status.assert_called_with(201) self.endpoint.set_header.assert_called_with( "Location", "Somewhere") - args, kwargs = mock_log.call_args + args, kwargs = self.endpoint.log.info.call_args eq_("Router miss, message stored.", args[0]) - log_patcher.stop() self.finish_deferred.addCallback(handle_finish) self.endpoint.post(None, dummy_uaid) return self.finish_deferred - @patch("twisted.python.log") - def test_post_db_error_in_routing(self, mock_log): + def test_post_db_error_in_routing(self): from autopush.router.interface import RouterException self.fernet_mock.decrypt.return_value = dummy_token self.endpoint.set_header = Mock() @@ -1207,8 +1180,7 @@ def test_cors_options(self): eq_(endpoint._headers[ch3], self.CORS_HEADERS) eq_(endpoint._headers[ch4], self.CORS_RESPONSE_HEADERS) - @patch_logger - def test_write_error(self, log_mock): + def test_write_error(self): """ Write error is triggered by sending the app a request with an invalid method (e.g. "put" instead of "PUT"). This is not code that is triggered within normal flow, but @@ -1224,10 +1196,9 @@ class testX(Exception): self.endpoint.write_error(999, exc_info=exc_info) self.status_mock.assert_called_with(999) - self.assertTrue(log_mock.err.called) + self.assertTrue(self.endpoint.log.called) - @patch_logger - def test_write_error_no_exc(self, log_mock): + def test_write_error_no_exc(self): """ Write error is triggered by sending the app a request with an invalid method (e.g. "put" instead of "PUT"). This is not code that is triggered within normal flow, but @@ -1235,7 +1206,7 @@ def test_write_error_no_exc(self, log_mock): """ self.endpoint.write_error(999) self.status_mock.assert_called_with(999) - self.assertTrue(log_mock.err.called) + self.assertTrue(self.endpoint.log.called) def _assert_error_response(self, result): self.status_mock.assert_called_with(500) diff --git a/autopush/tests/test_health.py b/autopush/tests/test_health.py index a90df3ec..8d683412 100644 --- a/autopush/tests/test_health.py +++ b/autopush/tests/test_health.py @@ -4,7 +4,7 @@ InternalServerError, ) from cyclone.web import Application -from mock import (Mock, patch) +from mock import Mock from moto import mock_dynamodb2 from twisted.internet.defer import Deferred from twisted.trial import unittest @@ -20,10 +20,10 @@ class HealthTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger self.timeout = 0.5 twisted.internet.base.DelayedCall.debug = True - self.log_mock = patch("autopush.health.log").start() self.mock_dynamodb2 = mock_dynamodb2() self.mock_dynamodb2.start() @@ -36,6 +36,7 @@ def setUp(self): self.request_mock = Mock() self.health = HealthHandler(Application(), self.request_mock) + self.health.log = self.log_mock = Mock(spec=Logger) self.status_mock = self.health.set_status = Mock() self.write_mock = self.health.write = Mock() @@ -44,7 +45,6 @@ def setUp(self): def tearDown(self): self.mock_dynamodb2.stop() - self.log_mock.stop() def test_healthy(self): return self._assert_reply({ diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index 95cb62b0..00537223 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -290,7 +290,7 @@ def ack(self, channel, version): def disconnect(self): self.ws.close() - def sleep(self, duration): + def sleep(self, duration): # pragma: nocover time.sleep(duration) def wait_for(self, func): diff --git a/autopush/tests/test_logging.py b/autopush/tests/test_logging.py index 27a29e22..85eb5945 100644 --- a/autopush/tests/test_logging.py +++ b/autopush/tests/test_logging.py @@ -7,9 +7,12 @@ from nose.tools import eq_, ok_ from twisted.internet import reactor from twisted.internet.defer import Deferred -from twisted.python import log, failure +from twisted.logger import Logger +from twisted.python import failure -from autopush.logging import setup_logging +from autopush.logging import PushLogger + +log = Logger() class SentryLogTestCase(twisted.trial.unittest.TestCase): @@ -25,10 +28,10 @@ def tearDown(self): def test_sentry_logging(self): os.environ["SENTRY_DSN"] = "some_locale" - setup_logging("Autopush") + PushLogger.setup_logging("Autopush", sentry_dsn=True) eq_(len(self.mock_raven.mock_calls), 2) - log.err(failure.Failure(Exception("eek"))) + log.failure("error", failure.Failure(Exception("eek"))) self.flushLoggedErrors() d = Deferred() @@ -43,21 +46,38 @@ def check(): return d -class EliotLogTestCase(twisted.trial.unittest.TestCase): +class PushLoggerTestCase(twisted.trial.unittest.TestCase): def test_custom_type(self): - setup_logging("Autopush") - with patch("sys.stdout") as mock_stdout: - log.msg("omg!", Type=7) - eq_(len(mock_stdout.mock_calls), 1) - kwargs = mock_stdout.mock_calls[0][1][0] + obj = PushLogger.setup_logging("Autopush") + obj._output = mock_stdout = Mock() + log.info("omg!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + kwargs = mock_stdout.mock_calls[0][1][0] ok_("Type" in kwargs) def test_human_logs(self): - setup_logging("Autopush", True) - with patch("sys.stdout") as mock_stdout: - mock_stdout.reset_mock() - log.msg("omg!", Type=7) - eq_(len(mock_stdout.mock_calls), 4) - mock_stdout.reset_mock() - log.err("wtf!", Type=7) - eq_(len(mock_stdout.mock_calls), 4) + obj = PushLogger.setup_logging("Autopush", log_format="text") + obj._output = mock_stdout = Mock() + log.info("omg!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + mock_stdout.reset_mock() + log.error("wtf!", Type=7) + eq_(len(mock_stdout.mock_calls), 2) + + def test_start_stop(self): + obj = PushLogger.setup_logging("Autopush") + obj.start() + obj.stop() + + def test_file_output(self): + try: + os.unlink("testfile.txt") + except: # pragma: nocover + pass + obj = PushLogger.setup_logging("Autoput", log_output="testfile.txt") + obj.start() + log.info("wow") + obj.stop() + with open("testfile.txt") as f: + lines = f.readlines() + eq_(len(lines), 1) diff --git a/autopush/tests/test_router.py b/autopush/tests/test_router.py index 29e2dd1f..3c14c594 100644 --- a/autopush/tests/test_router.py +++ b/autopush/tests/test_router.py @@ -8,7 +8,6 @@ from nose.tools import eq_, ok_ from twisted.trial import unittest from twisted.internet.error import ConnectError, ConnectionRefusedError -from twisted.python import log import apns import gcmclient @@ -82,6 +81,7 @@ def init(self, settings, router_conf): class APNSRouterTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger settings = AutopushSettings( hostname="localhost", statsd_host=None, @@ -90,6 +90,7 @@ def setUp(self): self.mock_apns = Mock(spec=apns.APNs) self.router = APNSRouter(settings, apns_config) self.router.apns = self.mock_apns + self.router.log = Mock(spec=Logger) self.notif = Notification(10, "data", dummy_chid, None, 200) self.router_data = dict(router_data=dict(token="connect_data")) @@ -430,12 +431,14 @@ def test_ammend(self): class SimplePushRouterTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger settings = AutopushSettings( hostname="localhost", statsd_host=None, ) self.router = SimpleRouter(settings, {}) + self.router.log = Mock(spec=Logger) self.notif = Notification(10, "data", dummy_chid, None, 200) mock_result = Mock(spec=gcmclient.gcm.Result) mock_result.canonical = dict() @@ -447,7 +450,6 @@ def setUp(self): self.agent_mock = Mock(spec=settings.agent) settings.agent = self.agent_mock self.router.metrics = Mock() - self.errors = [] def tearDown(self): dead_cache.clear() @@ -480,27 +482,17 @@ def verify_deliver(result): d.addBoth(verify_deliver) return d - def _mockObserver(self, event): - self.errors.append(event) - - def _contains_err(self, string): - for err in self.errors: - if string in err['message'][0]: - return True - return False # pragma: nocover - def test_route_connection_fail_saved(self): self.agent_mock.request.side_effect = MockAssist( [self._raise_connection_refused_error]) router_data = dict(node_id="http://somewhere", uaid=dummy_uaid) self.router_mock.clear_node.return_value = None self.router_mock.get_uaid.return_value = {} - log.addObserver(self._mockObserver) self.storage_mock.save_notification.return_value = True d = self.router.route_notification(self.notif, router_data) def verify_deliver(reply): - ok_(self._contains_err('ConnectionRefusedError')) + eq_(len(self.router.log.debug.mock_calls), 1) ok_(reply.status_code, 202) eq_(len(self.router_mock.clear_node.mock_calls), 1) diff --git a/autopush/tests/test_websocket.py b/autopush/tests/test_websocket.py index 7170bb3d..248e4839 100644 --- a/autopush/tests/test_websocket.py +++ b/autopush/tests/test_websocket.py @@ -47,8 +47,10 @@ def tearDown(): class WebsocketTestCase(unittest.TestCase): def setUp(self): + from twisted.logger import Logger twisted.internet.base.DelayedCall.debug = True self.proto = PushServerProtocol() + self.proto.log = Mock(spec=Logger) settings = AutopushSettings( hostname="localhost", @@ -104,7 +106,6 @@ def handle_message(result): return d def test_exc_catcher(self): - self.proto.log_err = Mock() req = Mock() def raise_error(*args, **kwargs): @@ -977,8 +978,6 @@ def test_unregister_with_webpush(self): assert self.proto.force_retry.called def test_ws_unregister(self): - patcher = patch("autopush.websocket.log", spec=True) - mock_log = patcher.start() self._connect() self._send_message(dict(messageType="hello", channelIDs=[])) @@ -989,8 +988,7 @@ def test_ws_unregister(self): def check_unregister_result(msg): eq_(msg["status"], 200) eq_(msg["channelID"], chid) - eq_(len(mock_log.mock_calls), 1) - patcher.stop() + eq_(len(self.proto.log.mock_calls), 1) d.callback(True) def check_hello_result(msg): @@ -1040,14 +1038,11 @@ def check_unregister_result(msg): return d def test_ws_unregister_fail(self): - patcher = patch('autopush.websocket.log', spec=True) - mock_log = patcher.start() self._connect() self.proto.ps.uaid = str(uuid.uuid4()) chid = str(uuid.uuid4()) d = Deferred() - d.addBoth(lambda x: patcher.stop()) # Replace storage delete with call to fail table = self.proto.ap_settings.storage.table @@ -1063,9 +1058,9 @@ def raise_exception(*args, **kwargs): channelID=chid)) def wait_for_times(): # pragma: nocover - if len(mock_log.mock_calls) > 0: + if len(self.proto.log.failure.mock_calls) > 0: try: - eq_(len(mock_log.mock_calls), 1) + eq_(len(self.proto.log.failure.mock_calls), 1) finally: d.callback(True) return @@ -1133,14 +1128,11 @@ def test_notification_avoid_newer_delivery(self): eq_(args, None) def test_ack(self): - patcher = patch('autopush.websocket.log', spec=True) - mock_log = patcher.start() self._connect() self._send_message(dict(messageType="hello", channelIDs=[])) d = Deferred() chid = str(uuid.uuid4()) - d.addBoth(lambda x: patcher.stop()) # stick a notification to ack in self.proto.ps.direct_updates[chid] = 12 @@ -1155,8 +1147,8 @@ def check_hello_result(msg): # Verify it was cleared out eq_(len(self.proto.ps.direct_updates), 0) - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "simplepush") eq_(kwargs["message_source"], "direct") @@ -1171,8 +1163,7 @@ def test_ack_with_bad_input(self): self._connect() eq_(self.proto.ack_update(None), None) - @patch('autopush.websocket.log', spec=True) - def test_ack_with_webpush_direct(self, mock_log): + def test_ack_with_webpush_direct(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) chid = str(uuid.uuid4()) @@ -1188,14 +1179,13 @@ def test_ack_with_webpush_direct(self, mock_log): version="bleh:asdjfilajsdilfj" )) eq_(self.proto.ps.direct_updates[chid], []) - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "webpush") eq_(kwargs["message_source"], "direct") - @patch('autopush.websocket.log', spec=True) - def test_ack_with_webpush_from_storage(self, mock_log): + def test_ack_with_webpush_from_storage(self): self._connect() chid = str(uuid.uuid4()) self.proto.ps.use_webpush = True @@ -1214,14 +1204,13 @@ def test_ack_with_webpush_from_storage(self, mock_log): )) assert self.proto.force_retry.called assert mock_defer.addBoth.called - eq_(len(mock_log.mock_calls), 1) - args, kwargs = mock_log.msg.call_args + eq_(len(self.proto.log.info.mock_calls), 1) + args, kwargs = self.proto.log.info.call_args eq_(args[0], "Ack") eq_(kwargs["router_key"], "webpush") eq_(kwargs["message_source"], "stored") - @patch('autopush.websocket.log', spec=True) - def test_nack(self, mock_log): + def test_nack(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) self.proto.onMessage(json.dumps(dict( @@ -1229,17 +1218,16 @@ def test_nack(self, mock_log): version="bleh:asdfhjklhjkl", code=200 )), False) - eq_(len(mock_log.mock_calls), 1) + eq_(len(self.proto.log.info.mock_calls), 1) - @patch('autopush.websocket.log', spec=True) - def test_nack_no_version(self, mock_log): + def test_nack_no_version(self): self._connect() self.proto.ps.uaid = str(uuid.uuid4()) self.proto.onMessage(json.dumps(dict( messageType="nack", code=200 )), False) - eq_(len(mock_log.mock_calls), 0) + eq_(len(self.proto.log.info.mock_calls), 0) def test_ack_remove(self): self._connect() @@ -1382,14 +1370,13 @@ def throw_error(*args, **kwargs): self.proto.ap_settings.storage = Mock( **{"fetch_notifications.side_effect": throw_error}) self.proto.ps._check_notifications = True - self.proto.log_err = Mock() self.proto.process_notifications() d = Deferred() def check_error(result): eq_(self.proto.ps._check_notifications, False) - ok_(self.proto.log_err.called) + ok_(self.proto.log.failure.called) d.callback(True) self.proto.ps._notification_fetch.addBoth(check_error) diff --git a/autopush/utils.py b/autopush/utils.py index 2f03597f..83403c09 100644 --- a/autopush/utils.py +++ b/autopush/utils.py @@ -8,7 +8,8 @@ import ecdsa from jose import jws -from twisted.python import failure, log +from twisted.logger import Logger +from twisted.python import failure default_ports = { @@ -102,7 +103,8 @@ def extract_jwt(token, crypto_key): return jws.verify(token, vk, algorithms=["ES256"]) -class ErrorLogger (object): +class ErrorLogger(object): + log = Logger() def write_error(self, code, **kwargs): """Write the error (otherwise unhandled exception when dealing with @@ -113,8 +115,9 @@ def write_error(self, code, **kwargs): """ self.set_status(code) if "exc_info" in kwargs: - log.err(failure.Failure(*kwargs["exc_info"]), - **self._client_info) + self.log.failure(failure.Failure(*kwargs["exc_info"]), + **self._client_info) else: - log.err("Error in handler: %s" % code, **self._client_info) + self.log.failure("Error in handler: %s" % code, + **self._client_info) self.finish() diff --git a/autopush/websocket.py b/autopush/websocket.py index 00fde0b1..62016d72 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -52,8 +52,9 @@ ) from twisted.internet.interfaces import IProducer from twisted.internet.threads import deferToThread +from twisted.logger import Logger from twisted.protocols import policies -from twisted.python import failure, log +from twisted.python import failure from zope.interface import implements from twisted.web.resource import Resource @@ -103,7 +104,7 @@ def wrapper(self, *args, **kwargs): try: return func(self, *args, **kwargs) except Exception: - self.log_err(failure.Failure()) + self.log_failure(failure.Failure()) return wrapper @@ -226,6 +227,7 @@ def stopProducing(self): class PushServerProtocol(WebSocketServerProtocol, policies.TimeoutMixin): """Main Websocket Connection Protocol""" + log = Logger() # Testing purposes parent_class = WebSocketServerProtocol @@ -280,7 +282,7 @@ def force_retry(self, func, *args, **kwargs): def wrapper(result, *w_args, **w_kwargs): if isinstance(result, failure.Failure): # This is an exception, log it - self.log_err(result) + self.log_failure(result) d = deferToThread(func, *args, **kwargs) d.addErrback(wrapper) @@ -295,9 +297,9 @@ def base_tags(self): bug""" return self.ps._base_tags if self.ps._base_tags else None - def log_err(self, failure, **kwargs): - """Log a twisted failure out through twisted's log.err""" - log.err(failure, **kwargs) + def log_failure(self, failure, **kwargs): + """Log a twisted failure out through twisted's log.failure""" + self.log.failure("Unexpected error", failure, **kwargs) @property def paused(self): @@ -499,7 +501,7 @@ def _save_webpush_notif(self, notif): message_id=notif.version, ttl=notif.ttl, timestamp=notif.timestamp, - ).addErrback(self.log_err) + ).addErrback(self.log_failure) def _save_simple_notif(self, channel_id, version): """Save a simplepush notification""" @@ -508,7 +510,7 @@ def _save_simple_notif(self, channel_id, version): uaid=self.ps.uaid, chid=channel_id, version=version, - ).addErrback(self.log_err) + ).addErrback(self.log_failure) def _lookup_node(self, results): """Looks up the node to send a notify for it to check storage if @@ -519,7 +521,8 @@ def _lookup_node(self, results): self.ps.uaid ) d.addCallback(self._notify_node) - d.addErrback(self.log_err, extra="Failed to get UAID for redeliver") + d.addErrback(self.log_failure, + extra="Failed to get UAID for redeliver") def _notify_node(self, result): """Checks the result of lookup node to send the notify if the client is @@ -543,7 +546,7 @@ def _notify_node(self, result): "PUT", url.encode("utf8"), ).addCallback(IgnoreBody.ignore) - d.addErrback(self.log_err, extra="Failed to notify node") + d.addErrback(self.log_failure, extra="Failed to notify node") def returnError(self, messageType, reason, statusCode, close=True): """Return an error to a client, and optionally shut down the connection @@ -657,7 +660,7 @@ def _register_user(self, existing_user=True): def err_hello(self, failure): """errBack for hello failures""" self.transport.resumeProducing() - self.log_err(failure) + self.log_failure(failure) self.returnError("hello", "error", 503) def _copy_new_data(self, result): @@ -732,7 +735,7 @@ def _check_other_nodes(self, result): ResponseNeverReceived, ConnectionLost, ConnectionDone)) - d.addErrback(self.log_err, + d.addErrback(self.log_failure, extra="Failed to delete old node") # UDP clients are done at this point and timed out to ensure they @@ -835,7 +838,7 @@ def process_notifications(self): def error_notifications(self, fail): """errBack for notification check failing""" # If we error'd out on this important check, we drop the connection - self.log_err(fail) + self.log_failure(fail) self.sendClose() def finish_notifications(self, notifs): @@ -933,7 +936,7 @@ def _rotate_message_table(self): d.addCallback(self._register_rotated_channels) d.addErrback(self.trap_cancel) d.addErrback(self.err_overload, "notif") - d.addErrback(self.log_err) + d.addErrback(self.log_failure) def _register_rotated_channels(self, result): """Register the channels into a new entry in the current month""" @@ -1052,8 +1055,9 @@ def process_unregister(self, data): # Log out the unregister if it has a code in it if "code" in data: code = extract_code(data) - log.msg("Unregister", channelID=chid, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Unregister", channelID=chid, + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) # Clear out any existing tracked messages for this channel if self.ps.use_webpush: @@ -1112,10 +1116,10 @@ def ver_filter(update): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - log.msg("Ack", router_key="webpush", channelID=chid, - message_id=version, message_source="direct", - message_size=size, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="webpush", channelID=chid, + message_id=version, message_source="direct", + message_size=size, uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) self.ps.direct_updates[chid].remove(msg) return @@ -1123,10 +1127,10 @@ def ver_filter(update): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - log.msg("Ack", router_key="webpush", channelID=chid, - message_id=version, message_source="stored", - message_size=size, uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="webpush", channelID=chid, + message_id=version, message_source="stored", + message_size=size, uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) d = self.force_retry(self.ps.message.delete_message, uaid=self.ps.uaid, channel_id=chid, @@ -1156,15 +1160,15 @@ def _handle_simple_ack(self, chid, version, code): if chid in self.ps.direct_updates and \ self.ps.direct_updates[chid] <= version: del self.ps.direct_updates[chid] - log.msg("Ack", router_key="simplepush", channelID=chid, - message_id=version, message_source="direct", - uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="simplepush", channelID=chid, + message_id=version, message_source="direct", + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) return - log.msg("Ack", router_key="simplepush", channelID=chid, - message_id=version, message_source="stored", - uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, code=code) + self.log.info("Ack", router_key="simplepush", channelID=chid, + message_id=version, message_source="stored", + uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, code=code) if chid in self.ps.updates_sent and \ self.ps.updates_sent[chid] <= version: del self.ps.updates_sent[chid] @@ -1199,9 +1203,9 @@ def process_nack(self, data): version, updateid = version.split(":") - log.msg("Nack", uaid_hash=self.ps.uaid_hash, - user_agent=self.ps.user_agent, message_id=version, - code=code) + self.log.info("Nack", uaid_hash=self.ps.uaid_hash, + user_agent=self.ps.user_agent, message_id=version, + code=code) def check_missed_notifications(self, results, resume=False): """Check to see if notifications were missed""" diff --git a/doc-requirements.txt b/doc-requirements.txt index f13824c4..a6351832 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -17,7 +17,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.7.1 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4 diff --git a/requirements.txt b/requirements.txt index 9cbc7d0a..55e50a50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.11.0 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4 diff --git a/test-requirements.txt b/test-requirements.txt index 0fb0ebb2..e3cb38a9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -21,7 +21,6 @@ cyclone==1.1 datadog==0.5.0 decorator==4.0.0 ecdsa==0.13 -eliot==0.11.0 enum34==1.0.4 funcsigs==0.4 gcm-client==0.1.4