From d658544ecd00813af9f2d07f1fbe14d822ca76fb Mon Sep 17 00:00:00 2001 From: Daniel Sotirhos Date: Wed, 8 Mar 2023 17:07:06 -0800 Subject: [PATCH] Fix: Frame Injection (azul-private#12) --- src/azul/chalice.py | 34 ++++++ test/app_test_case.py | 6 + test/test_content_type.py | 245 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 test/test_content_type.py diff --git a/src/azul/chalice.py b/src/azul/chalice.py index 7a0334fcbe..a3154745c9 100644 --- a/src/azul/chalice.py +++ b/src/azul/chalice.py @@ -1,6 +1,7 @@ from collections.abc import ( Iterable, ) +import html import json from json import ( JSONEncoder, @@ -36,6 +37,12 @@ from furl import ( furl, ) +from more_itertools import ( + only, +) +from werkzeug.http import ( + parse_accept_header, +) from azul import ( config, @@ -103,6 +110,7 @@ def __init__(self, self._specs: Optional[MutableJSON] = None super().__init__(app_name, debug=config.debug > 0, configure_logs=False) # Middleware is invoked in order of registration + self.register_middleware(self._html_wrapping_middleware, 'http') self.register_middleware(self._logging_middleware, 'http') self.register_middleware(self._hsts_header_middleware, 'http') self.register_middleware(self._lambda_context_middleware, 'all') @@ -137,6 +145,32 @@ def patched_event_source_handler(self_, event, context): if old_handler.__code__ != patched_event_source_handler.__code__: chalice.app.EventSourceHandler.__call__ = patched_event_source_handler + def _html_wrapping_middleware(self, event, get_response): + """ + Embed a `text/plain` response in HTML if the request favors `text/html`. + Any HTML fragments in the original response are escaped to counteract + HTML injection attacks. This doesn't fully prevent those attacks because + a broken, ancient user agent might still request `text/plain`, `*/*` or + nothing, ignore the `text/plain` content type, sniff the HTML fragment + and render it. It does, however, handle vulnerability scanners because + those do prefer `text/html`. + """ + response = get_response(event) + ct_key = only(k for k in response.headers if k.casefold() == 'content-type') + if ct_key and response.headers[ct_key] == 'text/plain': + parsed = parse_accept_header(event.headers.get('accept')) + text_html = parsed.find('text/html') + star_star = parsed.find('*/*') + if 0 <= text_html and (star_star < 0 or text_html < star_star): + response.body = ( + '' + f'

Status {response.status_code}

' + f'
{html.escape(json.dumps(response.body), quote=False)}
' + '' + ) + response.headers[ct_key] = 'text/html' + return response + def _logging_middleware(self, event, get_response): self._log_request() response = get_response(event) diff --git a/test/app_test_case.py b/test/app_test_case.py index 280ef8c4fa..9c1a06d9c8 100644 --- a/test/app_test_case.py +++ b/test/app_test_case.py @@ -6,6 +6,10 @@ Thread, ) import time +from typing import ( + Any, + ClassVar, +) # noinspection PyPackageRequirements from chalice.config import ( @@ -69,6 +73,8 @@ class LocalAppTestCase(CatalogTestCase, metaclass=ABCMeta): ElasticsearchTestCase. """ + app_module: ClassVar[Any] + @classmethod @abstractmethod def lambda_name(cls) -> str: diff --git a/test/test_content_type.py b/test/test_content_type.py new file mode 100644 index 0000000000..32a1eae7ae --- /dev/null +++ b/test/test_content_type.py @@ -0,0 +1,245 @@ +import re +from typing import ( + Any, + Callable, +) +from unittest import ( + mock, +) + +from chalice import ( + Chalice, + ChaliceUnhandledError, + NotFoundError, + Response, +) +import requests + +from app_test_case import ( + LocalAppTestCase, +) +from azul_test_case import ( + DCP2TestCase, +) + + +class TestContentType(LocalAppTestCase, DCP2TestCase): + + @classmethod + def lambda_name(cls) -> str: + return 'service' + + @classmethod + def setUpClass(cls): + super().setUpClass() + + @cls.app_module.app.route('/test') + def route(): + pass + + def _replace_handler(self, handler: Callable[[None], Any]): + """ + Replace the current handler for route `/test` with the provided handler + """ + route = '/test' + app = self.__class__.app_module.app + app.routes.pop(route) + app._register_handler(handler_type='route', + name='route', + user_handler=handler, + wrapped_handler=handler, + kwargs={'path': route, 'kwargs': {}}, + options=None) + + def _shrink_traceback(self, s: str) -> str: + """ + Return a modified version of the given traceback. The first and last + lines are kept, and everything inbetween is replaced with a single line + of '...'. + """ + if s.startswith('Traceback'): + lines = s.split('\n') + assert lines[-1] == '', s # since traceback ends with a '\n' + s = '\n'.join([lines[0], '...', lines[-2]]) + elif s.startswith(''): + # Assumes traceback is a json dumped string inside
 tags
+            pattern = re.compile(r'(
)'
+                                 r'("Traceback.*?\\n)'  # 1st line of traceback
+                                 r'.*\\n'  # middle lines
+                                 r'(.*)\\n'  # last line of traceback
+                                 r'("
)') + s = re.sub(pattern, r'\1\2...\\n\3\4', s) + return s + + def _test_route(self, + handler_fn: Callable[[None], Any], + expected_fn: Callable[[bool, bool], tuple[str, str]] + ): + """ + Verify the response against expected for requests made with various + types of `accept` header values. + """ + self._replace_handler(handler_fn) + for debug in (False, True): + with mock.patch.object(Chalice, 'debug', 1 if debug else 0): + for accept, expect_wrapped in [ + (None, False), + ('*/*', False), + ('*/*,text/html', False), + ('text/html', True), + ('text/html,*/*', True), + ('*/*;q=0.9,text/html', True), + ('text/html;q=0.9,*/*;q=1.0', False), + ]: + with self.subTest(debug=debug, accept=accept): + url = self.base_url.set(path=('test',)) + headers = {'accept': accept} + response = requests.get(url, headers=headers) + response_text = self._shrink_traceback(response.text) + expected_text, expected_content_type = expected_fn(debug, expect_wrapped) + self.assertEqual(expected_text, response_text) + self.assertEqual(expected_content_type, response.headers['Content-Type']) + + def test_string(self): + + def route(): + return '