diff --git a/autopush/http.py b/autopush/http.py index 2ec30fdf..c2ee9d37 100644 --- a/autopush/http.py +++ b/autopush/http.py @@ -92,7 +92,9 @@ def for_handler(cls, handler_cls, *args, **kwargs): thrown. """ - for pattern, handler in cls.ap_handlers: + if 'handlers' in kwargs: # pragma: nocover + raise ValueError("handler_cls incompatibile with handlers kwarg") + for pattern, handler in cls.ap_handlers + cls.health_ap_handlers: if handler is handler_cls: return cls(handlers=[(pattern, handler)], *args, **kwargs) raise ValueError("{!r} not in ap_handlers".format( diff --git a/autopush/logging.py b/autopush/logging.py index fb025abd..fd8820c1 100644 --- a/autopush/logging.py +++ b/autopush/logging.py @@ -60,15 +60,19 @@ began_logging = False -def begin_or_register(observer): - # type: (Any) -> None +def begin_or_register(observer, redirectStandardIO=False, **kwargs): + # type: (Any, bool, **Any) -> None """Register observer with the global LogPublisher Registers via the global LogBeginner the first time called. """ global began_logging if not began_logging: - globalLogBeginner.beginLoggingTo([observer], redirectStandardIO=False) + globalLogBeginner.beginLoggingTo( + [observer], + redirectStandardIO=redirectStandardIO, + **kwargs + ) began_logging = True else: globalLogPublisher.addObserver(observer) diff --git a/autopush/tests/support.py b/autopush/tests/support.py new file mode 100644 index 00000000..4336155a --- /dev/null +++ b/autopush/tests/support.py @@ -0,0 +1,30 @@ +from twisted.logger import ILogObserver +from zope.interface import implementer + + +@implementer(ILogObserver) +class TestingLogObserver(object): + def __init__(self): + self._events = [] + + def __call__(self, event): + self._events.append(event) + + def __len__(self): + return len(self._events) + + def logged(self, predicate): + """Determine if any log events satisfy the callable""" + assert callable(predicate) + return any(predicate(e) for e in self._events) + + def logged_ci(self, predicate): + """Determine if any log client_infos satisfy the callable""" + assert callable(predicate) + return self.logged( + lambda e: 'client_info' in e and predicate(e['client_info'])) + + def logged_session(self): + """Extract the last logged session""" + return filter(lambda e: e["log_format"] == "Session", + self._events)[-1] diff --git a/autopush/tests/test_health.py b/autopush/tests/test_health.py index 5505b702..3cb6c3d8 100644 --- a/autopush/tests/test_health.py +++ b/autopush/tests/test_health.py @@ -1,25 +1,33 @@ +import json + import twisted.internet.base from boto.dynamodb2.exceptions import InternalServerError from cyclone.web import Application from mock import Mock from moto import mock_dynamodb2 -from twisted.internet.defer import Deferred +from nose.tools import eq_ +from twisted.internet.defer import inlineCallbacks +from twisted.logger import globalLogPublisher from twisted.trial import unittest from autopush import __version__ from autopush.exceptions import MissingTableException +from autopush.http import EndpointHTTPFactory +from autopush.logging import begin_or_register from autopush.settings import AutopushSettings +from autopush.tests.client import Client +from autopush.tests.support import TestingLogObserver from autopush.web.health import HealthHandler, StatusHandler class HealthTestCase(unittest.TestCase): def setUp(self): - from twisted.logger import Logger self.timeout = 0.5 twisted.internet.base.DelayedCall.debug = True self.mock_dynamodb2 = mock_dynamodb2() self.mock_dynamodb2.start() + self.addCleanup(self.mock_dynamodb2.stop) settings = AutopushSettings( hostname="localhost", @@ -28,22 +36,17 @@ def setUp(self): self.router_table = settings.router.table self.storage_table = settings.storage.table - self.request_mock = Mock() - self.health = HealthHandler(Application(), - self.request_mock, - ap_settings=settings) - self.health.log = self.log_mock = Mock(spec=Logger) - self.status_mock = self.health.set_status = Mock() - self.write_mock = self.health.write = Mock() - - d = self.finish_deferred = Deferred() - self.health.finish = lambda: d.callback(True) + # ignore logging + logs = TestingLogObserver() + begin_or_register(logs) + self.addCleanup(globalLogPublisher.removeObserver, logs) - def tearDown(self): - self.mock_dynamodb2.stop() + app = EndpointHTTPFactory.for_handler(HealthHandler, settings) + self.client = Client(app) + @inlineCallbacks def test_healthy(self): - return self._assert_reply({ + yield self._assert_reply({ "status": "OK", "version": __version__, "clients": 0, @@ -51,6 +54,7 @@ def test_healthy(self): "router": {"status": "OK"} }) + @inlineCallbacks def test_aws_error(self): def raise_error(*args, **kwargs): raise InternalServerError(None, None) @@ -59,7 +63,7 @@ def raise_error(*args, **kwargs): self.storage_table.connection.list_tables = Mock( return_value={"TableNames": ["storage"]}) - return self._assert_reply({ + yield self._assert_reply({ "status": "NOT OK", "version": __version__, "clients": 0, @@ -70,12 +74,13 @@ def raise_error(*args, **kwargs): } }, InternalServerError) + @inlineCallbacks def test_nonexistent_table(self): no_tables = Mock(return_value={"TableNames": []}) self.storage_table.connection.list_tables = no_tables self.router_table.connection.list_tables = no_tables - return self._assert_reply({ + yield self._assert_reply({ "status": "NOT OK", "version": __version__, "clients": 0, @@ -89,6 +94,7 @@ def test_nonexistent_table(self): } }, MissingTableException) + @inlineCallbacks def test_internal_error(self): def raise_error(*args, **kwargs): raise Exception("synergies not aligned") @@ -97,7 +103,7 @@ def raise_error(*args, **kwargs): self.storage_table.connection.list_tables = Mock( side_effect=raise_error) - return self._assert_reply({ + yield self._assert_reply({ "status": "NOT OK", "version": __version__, "clients": 0, @@ -108,16 +114,14 @@ def raise_error(*args, **kwargs): "router": {"status": "OK"} }, Exception) + @inlineCallbacks def _assert_reply(self, reply, exception=None): - def handle_finish(result): - if exception: - self.status_mock.assert_called_with(503, reason=None) - self.flushLoggedErrors(exception) - self.write_mock.assert_called_with(reply) - self.finish_deferred.addCallback(handle_finish) - - self.health.get() - return self.finish_deferred + resp = yield self.client.get('/health') + if exception: + eq_(resp.get_status(), 503) + self.flushLoggedErrors(exception) + payload = json.loads(resp.content) + eq_(payload, reply) class StatusTestCase(unittest.TestCase): @@ -127,6 +131,7 @@ def setUp(self): self.mock_dynamodb2 = mock_dynamodb2() self.mock_dynamodb2.start() + self.addCleanup(self.mock_dynamodb2.stop) settings = AutopushSettings( hostname="localhost", @@ -137,9 +142,6 @@ def setUp(self): ap_settings=settings) self.write_mock = self.status.write = Mock() - def tearDown(self): - self.mock_dynamodb2.stop() - def test_status(self): self.status.get() self.write_mock.assert_called_with({ diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index e1752237..eec68686 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -29,10 +29,7 @@ from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks, returnValue, Deferred from twisted.internet.threads import deferToThread -from twisted.logger import ( - globalLogPublisher, - ILogObserver -) +from twisted.logger import globalLogPublisher from twisted.test.proto_helpers import AccumulatingProtocol from twisted.trial import unittest from twisted.web.client import Agent, FileBodyProducer @@ -50,6 +47,7 @@ from autopush.settings import AutopushSettings from autopush.utils import base64url_encode from autopush.metrics import SinkMetrics +from autopush.tests.support import TestingLogObserver from autopush.websocket import PushServerFactory log = logging.getLogger(__name__) @@ -63,31 +61,6 @@ twisted.internet.base.DelayedCall.debug = True -@implementer(ILogObserver) -class TestingLogObserver(object): - def __init__(self): - self._events = [] - - def __call__(self, event): - self._events.append(event) - - def logged(self, predicate): - """Determine if any log events satisfy the callable""" - assert callable(predicate) - return any(predicate(e) for e in self._events) - - def logged_ci(self, predicate): - """Determine if any log client_infos satisfy the callable""" - assert callable(predicate) - return self.logged( - lambda e: 'client_info' in e and predicate(e['client_info'])) - - def logged_session(self): - """Extract the last logged session""" - return filter(lambda e: e["log_format"] == "Session", - self._events)[-1] - - def setUp(): logging.getLogger('boto').setLevel(logging.CRITICAL) boto_path = os.path.join(root_dir, "automock", "boto.cfg") @@ -473,7 +446,6 @@ def client_SSLCF(self, certfile): from twisted.internet.ssl import ( Certificate, PrivateCertificate, optionsForClientTLS) from twisted.web.iweb import IPolicyForHTTPS - from zope.interface import implementer with open(self.servercert) as fp: servercert = Certificate.loadPEM(fp.read()) diff --git a/autopush/tests/test_log_check.py b/autopush/tests/test_log_check.py index 2c50c0a6..9eac8697 100644 --- a/autopush/tests/test_log_check.py +++ b/autopush/tests/test_log_check.py @@ -1,15 +1,18 @@ import json import twisted -from cyclone.web import Application -from mock import Mock from moto import mock_dynamodb2 -from nose.tools import eq_ -from twisted.internet.defer import Deferred +from nose.tools import eq_, ok_ +from twisted.internet.defer import inlineCallbacks +from twisted.logger import globalLogPublisher from twisted.trial import unittest from autopush.db import create_rotating_message_table +from autopush.http import EndpointHTTPFactory +from autopush.logging import begin_or_register from autopush.settings import AutopushSettings +from autopush.tests.client import Client +from autopush.tests.support import TestingLogObserver from autopush.web.log_check import LogCheckHandler mock_dynamodb2 = mock_dynamodb2() @@ -28,48 +31,44 @@ class LogCheckTestCase(unittest.TestCase): def setUp(self): twisted.internet.base.DelayedCall.debug = True - from twisted.logger import Logger settings = AutopushSettings( hostname="localhost", statsd_host=None, ) - self.request_mock = Mock(body=b'', arguments={}, - headers={"ttl": "0"}, - host='example.com:8080') - self.lch = LogCheckHandler(Application(), - self.request_mock, - ap_settings=settings) - - self.finish_deferred = Deferred() - self.lch.finish = lambda: self.finish_deferred.callback(True) - self.lch.set_status = Mock() - self.lch.write = Mock() - self.lch.log = Mock(spec=Logger) - def test_get_err(self): - - def handle_finish(value): - call_args = self.lch.log.error.call_args[1] - eq_(call_args.get('format'), 'Test Error Message') - eq_(call_args.get('status_code'), 418) - write_arg = json.loads(self.lch.write.call_args[0][0]) - eq_(write_arg.get('code'), 418) - eq_(write_arg.get('message'), "ERROR:Success") + self.logs = TestingLogObserver() + begin_or_register(self.logs, discardBuffer=True) + self.addCleanup(globalLogPublisher.removeObserver, self.logs) - self.finish_deferred.addCallback(handle_finish) - self.lch.get(None) - return self.finish_deferred + app = EndpointHTTPFactory.for_handler(LogCheckHandler, settings) + self.client = Client(app) + @inlineCallbacks + def test_get_err(self): + resp = yield self.client.get('/v1/err') + eq_(len(self.logs), 2) + ok_(self.logs.logged( + lambda e: (e['log_level'].name == 'error' and + e['log_format'] == 'Test Error Message' and + e['status_code'] == 418) + )) + payload = json.loads(resp.content) + eq_(payload.get('code'), 418) + eq_(payload.get('message'), "ERROR:Success") + + @inlineCallbacks def test_get_crit(self): - - def handle_finish(value): - call_args = self.lch.log.failure.call_args[1] - eq_(call_args.get('status_code'), 418) - write_args = json.loads(self.lch.write.call_args[0][0]) - eq_(write_args.get('code'), 418) - eq_(write_args.get('error'), 'Test Failure') - - self.finish_deferred.addCallback(handle_finish) - self.lch.get(err_type='CRIT') - return self.finish_deferred + resp = yield self.client.get('/v1/err/crit') + eq_(len(self.logs), 2) + ok_(self.logs.logged( + lambda e: (e['log_level'].name == 'critical' and + e['log_failure'] and + e['log_format'] == 'Test Critical Message' and + e['status_code'] == 418) + )) + payload = json.loads(resp.content) + eq_(payload.get('code'), 418) + eq_(payload.get('error'), "Test Failure") + + self.flushLoggedErrors() diff --git a/autopush/tests/test_web_validation.py b/autopush/tests/test_web_validation.py index c8d7ecdb..c1fb0b1a 100644 --- a/autopush/tests/test_web_validation.py +++ b/autopush/tests/test_web_validation.py @@ -14,7 +14,7 @@ from mock import Mock, patch from moto import mock_dynamodb2 from nose.tools import eq_, ok_, assert_raises -from twisted.internet.defer import Deferred +from twisted.internet.defer import inlineCallbacks from twisted.trial import unittest from autopush.db import create_rotating_message_table @@ -112,42 +112,26 @@ def test_call_func_error(self): self._mock_errors.assert_called() eq_(len(mock_func.mock_calls), 0) + @inlineCallbacks def test_decorator(self): - from cyclone.web import RequestHandler - from autopush.web.base import threaded_validate + from cyclone.web import Application + from autopush.web.base import BaseWebHandler, threaded_validate + from autopush.tests.client import Client schema = self._make_basic_schema() - class AHandler(RequestHandler): + class AHandler(BaseWebHandler): + def authenticate_peer_cert(self): + pass + @threaded_validate(schema) def get(self): self.write("done") self.finish() - req = self._make_dummy_request() - app = Mock() - app.ui_modules = dict() - app.ui_methods = dict() - vr = AHandler(app, req) - vr._timings = dict() - d = Deferred() - vr.finish = lambda: d.callback(True) - vr.write = Mock() - vr._overload_err = Mock() - vr._boto_err = Mock() - vr._validation_err = Mock() - vr._response_err = Mock() - vr.ap_settings = Mock() - - e = Deferred() - - def check_result(result): - vr.write.assert_called_with("done") - e.callback(True) - - d.addCallback(check_result) - - vr.get() - return e + app = Application([('/test', AHandler, dict(ap_settings=Mock()))]) + client = Client(app) + resp = yield client.get('/test') + eq_(resp.content, "done") class TestSimplePushRequestSchema(unittest.TestCase): diff --git a/autopush/tests/test_web_webpush.py b/autopush/tests/test_web_webpush.py index f3942456..c20aec4c 100644 --- a/autopush/tests/test_web_webpush.py +++ b/autopush/tests/test_web_webpush.py @@ -2,17 +2,17 @@ import uuid from cryptography.fernet import Fernet -from cyclone.web import Application from mock import Mock from moto import mock_dynamodb2 from nose.tools import eq_, ok_ -from twisted.internet.defer import Deferred -from twisted.logger import Logger +from twisted.internet.defer import inlineCallbacks from twisted.trial import unittest from autopush.db import Router, create_rotating_message_table +from autopush.http import EndpointHTTPFactory from autopush.router.interface import IRouter, RouterResponse from autopush.settings import AutopushSettings +from autopush.tests.client import Client dummy_uaid = str(uuid.UUID("abad1dea00000000aabbccdd00000000")) dummy_chid = str(uuid.UUID("deadbeef00000000decafbad00000000")) @@ -33,31 +33,25 @@ class TestWebpushHandler(unittest.TestCase): def setUp(self): from autopush.web.webpush import WebPushHandler - settings = AutopushSettings( + self.ap_settings = settings = AutopushSettings( hostname="localhost", statsd_host=None, ) self.fernet_mock = settings.fernet = Mock(spec=Fernet) - self.ap_settings = settings - self.router_mock = settings.router = Mock(spec=Router) - self.request_mock = Mock(body=b'', arguments={}, - headers={"ttl": "0"}, - host='example.com:8080') - - self.wp = WebPushHandler(Application(), - self.request_mock, - ap_settings=settings) - self.status_mock = self.wp.set_status = Mock() - self.write_mock = self.wp.write = Mock() - self.wp.log = Mock(spec=Logger) - d = self.finish_deferred = Deferred() - self.wp.finish = lambda: d.callback(True) settings.routers["webpush"] = Mock(spec=IRouter) self.wp_router_mock = settings.routers["webpush"] + self.message_mock = settings.message = Mock() self.message_mock.all_channels.return_value = (True, [dummy_chid]) + app = EndpointHTTPFactory.for_handler(WebPushHandler, settings) + self.client = Client(app) + + def url(self, **kwargs): + return '/wpush/{api_ver}/{token}'.format(**kwargs) + + @inlineCallbacks def test_router_needs_update(self): self.ap_settings.parse_endpoint = Mock(return_value=dict( uaid=dummy_uaid, @@ -76,18 +70,15 @@ def test_router_needs_update(self): router_data=dict(token="new_connect"), ) - def handle_finish(result): - eq_(result, True) - self.wp.set_status.assert_called_with(503, reason=None) - ru = self.router_mock.register_user - ok_(ru.called) - eq_('webpush', ru.call_args[0][0].get('router_type')) - - self.finish_deferred.addCallback(handle_finish) - - self.wp.post(api_ver="v1", token=dummy_token) - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver="v1", token=dummy_token), + ) + eq_(resp.get_status(), 503) + ru = self.router_mock.register_user + ok_(ru.called) + eq_('webpush', ru.call_args[0][0].get('router_type')) + @inlineCallbacks def test_router_returns_data_without_detail(self): self.ap_settings.parse_endpoint = Mock(return_value=dict( uaid=dummy_uaid, @@ -106,96 +97,78 @@ def test_router_returns_data_without_detail(self): router_data=dict(), ) - def handle_finish(result): - eq_(result, True) - self.wp.set_status.assert_called_with(503, reason=None) - ok_(self.router_mock.drop_user.called) - - self.finish_deferred.addCallback(handle_finish) - - self.wp.post("v1", dummy_token) - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver="v1", token=dummy_token), + ) + eq_(resp.get_status(), 503) + ok_(self.router_mock.drop_user.called) + @inlineCallbacks def test_request_bad_ckey(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(404, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'invalid key' - self.request_mock.headers['crypto-key'] = 'dummy_key' - self.wp.post(token='ignored', api_ver='v1') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver="v1", token='ignored'), + headers={'crypto-key': 'dummy_key'} + ) + eq_(resp.get_status(), 404) + @inlineCallbacks def test_request_bad_v1_id(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(404, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'tooshort' - self.wp.post(token='ignored', api_ver='v1') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver="v1", token='ignored'), + ) + eq_(resp.get_status(), 404) + @inlineCallbacks def test_request_bad_v2_id_short(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(404, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'tooshort' - self.request_mock.headers['authorization'] = 'vapid t=dummy_key,k=aaa' - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'authorization': 'vapid t=dummy_key,k=aaa'} + ) + eq_(resp.get_status(), 404) + @inlineCallbacks def test_request_bad_draft02_auth(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - - self.finish_deferred.addCallback(handle_finish) - self.request_mock.headers['authorization'] = 'vapid foo' - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'authorization': 'vapid foo'} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_bad_draft02_missing_key(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - self.fernet_mock.decrypt.return_value = 'a' * 64 - self.finish_deferred.addCallback(handle_finish) - self.request_mock.headers['authorization'] = ( - 'vapid t=dummy.key.value,k=') - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'authorization': 'vapid t=dummy.key.value,k='} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_bad_draft02_bad_pubkey(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - self.fernet_mock.decrypt.return_value = 'a' * 64 - self.finish_deferred.addCallback(handle_finish) - self.request_mock.headers['authorization'] = ( - 'vapid t=dummy.key.value,k=!aaa') - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'authorization': 'vapid t=dummy.key.value,k=!aaa'} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_bad_v2_id_missing_pubkey(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'a' * 64 - self.request_mock.headers['crypto-key'] = 'key_id=dummy_key' - self.request_mock.headers['authorization'] = 'dummy_key' - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'crypto-key': 'key_id=dummy_key', + 'authorization': 'dummy_key'} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_v2_id_variant_pubkey(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'a' * 32 variant_key = base64.urlsafe_b64encode("0V0" + ('a' * 85)) - self.request_mock.headers['crypto-key'] = 'p256ecdsa=' + variant_key - self.request_mock.headers['authorization'] = 'webpush dummy.key' self.ap_settings.router.get_uaid = Mock() self.ap_settings.router.get_uaid.return_value = dict( uaid=dummy_uaid, @@ -203,16 +176,16 @@ def handle_finish(result): router_type="gcm", router_data=dict(creds=dict(senderID="bogus")), ) - self.wp.post(token='ignored', api_ver='v1') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v1', token='ignored'), + headers={'crypto-key': 'p256ecdsa=' + variant_key, + 'authorization': 'webpush dummy.key'} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_v2_id_no_crypt_auth(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'a' * 32 - self.request_mock.headers['authorization'] = 'webpush dummy.key' self.ap_settings.router.get_uaid = Mock() self.ap_settings.router.get_uaid.return_value = dict( uaid=dummy_uaid, @@ -220,16 +193,18 @@ def handle_finish(result): router_type="gcm", router_data=dict(creds=dict(senderID="bogus")), ) - self.wp.post(token='ignored', api_ver='v1') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v1', token='ignored'), + headers={'authorization': 'webpush dummy.key'} + ) + eq_(resp.get_status(), 401) + @inlineCallbacks def test_request_bad_v2_id_bad_pubkey(self): - def handle_finish(result): - self.wp.set_status.assert_called_with(401, reason=None) - - self.finish_deferred.addCallback(handle_finish) self.fernet_mock.decrypt.return_value = 'a' * 64 - self.request_mock.headers['crypto-key'] = 'p256ecdsa=Invalid!' - self.request_mock.headers['authorization'] = 'dummy_key' - self.wp.post(token='ignored', api_ver='v2') - return self.finish_deferred + resp = yield self.client.post( + self.url(api_ver='v2', token='ignored'), + headers={'crypto-key': 'p256ecdsa=Invalid!', + 'authorization': 'dummy_key'} + ) + eq_(resp.get_status(), 401) diff --git a/mypy.ini b/mypy.ini index b661fc97..b55f4c43 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,6 +1,5 @@ [mypy] python_version = 2.7 -fast_parser = true ignore_missing_imports = true follow_imports = skip warn_unused_ignores = true