From 0d23766c207f8d3af9f2f89d1768fc5e106d0ce8 Mon Sep 17 00:00:00 2001 From: Simon Willison Date: Tue, 30 Jun 2020 17:29:54 -0700 Subject: [PATCH] Don't CSRF protect Authorization: Bearer ..., refs #11 --- README.md | 4 ++++ asgi_csrf.py | 8 ++++++++ test_asgi_csrf.py | 14 ++++++++++++++ 3 files changed, 26 insertions(+) diff --git a/README.md b/README.md index 96a1f38..d76cf4d 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,10 @@ If it cannot find that environment variable, it will generate a random secret wh This means that if you do not configure a specific secret your user's `csrftoken` cookies will become invalid every time the server restarts! You should configure a secret. +## Other cases that skip CSRF protection + +* If the request includes an `Authorization: Bearer ...` header, commonly used by OAuth and JWT authentication, the request will not be required to include a CSRF token. This is because browsers cannot send those headers in a context that can be abused. + ## Limitations * Currently only works for `application/x-www-form-urlencoded` forms, not `multipart/form-data` forms (with file uploads) diff --git a/asgi_csrf.py b/asgi_csrf.py index 5e3102c..d2dc022 100644 --- a/asgi_csrf.py +++ b/asgi_csrf.py @@ -105,6 +105,14 @@ async def wrapped_send(event): # x-csrftoken header matches await app(scope, receive, wrapped_send) return + # Authorization: Bearer skips CSRF check + if ( + headers.get(b"authorization", b"") + .decode("latin-1") + .startswith("Bearer ") + ): + await app(scope, receive, wrapped_send) + return # We need to look for it in the POST body content_type = headers.get(b"content-type", b"").split(b";", 1)[0] if content_type == b"application/x-www-form-urlencoded": diff --git a/test_asgi_csrf.py b/test_asgi_csrf.py index ea0f2b4..4db4aed 100644 --- a/test_asgi_csrf.py +++ b/test_asgi_csrf.py @@ -167,3 +167,17 @@ async def test_multipart_not_supported(csrftoken): files={"csv": ("data.csv", "blah,foo\n1,2", "text/csv")}, cookies={"csrftoken": csrftoken}, ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "authorization,expected_status", [("Bearer xxx", 200), ("Basic xxx", 403)] +) +async def test_post_with_authorization(authorization, expected_status): + async with httpx.AsyncClient( + app=asgi_csrf(hello_world_app, signing_secret=SECRET) + ) as client: + response = await client.post( + "http://localhost/", headers={"Authorization": authorization} + ) + assert expected_status == response.status_code