From c91b4c2a01bbf8dd41d521a29710470c8b73599b Mon Sep 17 00:00:00 2001 From: Aymeric Augustin Date: Sun, 23 May 2021 18:51:27 +0200 Subject: [PATCH] Use constant-time comparison for passwords. --- docs/project/changelog.rst | 2 ++ src/websockets/legacy/auth.py | 28 +++++++++++++++------------- tests/legacy/test_auth.py | 13 ++++++++++--- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/docs/project/changelog.rst b/docs/project/changelog.rst index c2c869c13..a44dd0418 100644 --- a/docs/project/changelog.rst +++ b/docs/project/changelog.rst @@ -45,6 +45,8 @@ They may change at any time. * Optimized default compression settings to reduce memory usage. +* Protected against timing attacks on HTTP Basic Auth. + * Made it easier to customize authentication with :meth:`~auth.BasicAuthWebSocketServerProtocol.check_credentials`. diff --git a/src/websockets/legacy/auth.py b/src/websockets/legacy/auth.py index e7e69cac1..16016e6fd 100644 --- a/src/websockets/legacy/auth.py +++ b/src/websockets/legacy/auth.py @@ -6,6 +6,7 @@ import functools +import hmac import http from typing import Any, Awaitable, Callable, Iterable, Optional, Tuple, Union, cast @@ -154,24 +155,23 @@ def basic_auth_protocol_factory( if credentials is not None: if is_credentials(credentials): - - async def check_credentials(username: str, password: str) -> bool: - return (username, password) == credentials - + credentials_list = [cast(Credentials, credentials)] elif isinstance(credentials, Iterable): credentials_list = list(credentials) - if all(is_credentials(item) for item in credentials_list): - credentials_dict = dict(credentials_list) - - async def check_credentials(username: str, password: str) -> bool: - return credentials_dict.get(username) == password - - else: + if not all(is_credentials(item) for item in credentials_list): raise TypeError(f"invalid credentials argument: {credentials}") - else: raise TypeError(f"invalid credentials argument: {credentials}") + credentials_dict = dict(credentials_list) + + async def check_credentials(username: str, password: str) -> bool: + try: + expected_password = credentials_dict[username] + except KeyError: + return False + return hmac.compare_digest(expected_password, password) + if create_protocol is None: # Not sure why mypy cannot figure this out. create_protocol = cast( @@ -180,5 +180,7 @@ async def check_credentials(username: str, password: str) -> bool: ) return functools.partial( - create_protocol, realm=realm, check_credentials=check_credentials + create_protocol, + realm=realm, + check_credentials=check_credentials, ) diff --git a/tests/legacy/test_auth.py b/tests/legacy/test_auth.py index c4dbd88ad..2b670c31f 100644 --- a/tests/legacy/test_auth.py +++ b/tests/legacy/test_auth.py @@ -1,3 +1,4 @@ +import hmac import unittest import urllib.error @@ -27,7 +28,7 @@ async def process_request(self, path, request_headers): class CheckWebSocketServerProtocol(BasicAuthWebSocketServerProtocol): async def check_credentials(self, username, password): - return password == "letmein" + return hmac.compare_digest(password, "letmein") class AuthClientServerTests(ClientServerTestsMixin, AsyncioTestCase): @@ -81,7 +82,7 @@ def test_basic_auth_bad_multiple_credentials(self): ) async def check_credentials(username, password): - return password == "iloveyou" + return hmac.compare_digest(password, "iloveyou") create_protocol_check_credentials = basic_auth_protocol_factory( realm="auth-tests", @@ -158,7 +159,13 @@ def test_basic_auth_unsupported_credentials_details(self): self.assertEqual(raised.exception.read().decode(), "Unsupported credentials\n") @with_server(create_protocol=create_protocol) - def test_basic_auth_invalid_credentials(self): + def test_basic_auth_invalid_username(self): + with self.assertRaises(InvalidStatusCode) as raised: + self.start_client(user_info=("goodbye", "iloveyou")) + self.assertEqual(raised.exception.status_code, 401) + + @with_server(create_protocol=create_protocol) + def test_basic_auth_invalid_password(self): with self.assertRaises(InvalidStatusCode) as raised: self.start_client(user_info=("hello", "ihateyou")) self.assertEqual(raised.exception.status_code, 401)