From f874efb224fd2133793234ff924d7e445f013cc9 Mon Sep 17 00:00:00 2001 From: Jason Hu Date: Sat, 30 Jun 2018 19:31:36 -0700 Subject: [PATCH] By default to use access_token if hass.auth.active (#15212) * Force to use access_token if hass.auth.active * Not allow Basic auth with api_password if hass.auth.active * Block websocket api_password auth when hass.auth.active * Add legacy_api_password auth provider * lint * lint --- homeassistant/auth.py | 14 +- .../auth_providers/legacy_api_password.py | 104 +++++++++++++ homeassistant/components/http/__init__.py | 17 +- homeassistant/components/http/auth.py | 66 ++++---- homeassistant/components/websocket_api.py | 24 +-- .../test_legacy_api_password.py | 67 ++++++++ tests/components/http/test_auth.py | 147 ++++++++++++++++-- tests/components/test_websocket_api.py | 108 +++++++++---- 8 files changed, 466 insertions(+), 81 deletions(-) create mode 100644 homeassistant/auth_providers/legacy_api_password.py create mode 100644 tests/auth_providers/test_legacy_api_password.py diff --git a/homeassistant/auth.py b/homeassistant/auth.py index 22abcdf213ccc..f56e00bf31e55 100644 --- a/homeassistant/auth.py +++ b/homeassistant/auth.py @@ -280,6 +280,18 @@ def active(self): """Return if any auth providers are registered.""" return bool(self._providers) + @property + def support_legacy(self): + """ + Return if legacy_api_password auth providers are registered. + + Should be removed when we removed legacy_api_password auth providers. + """ + for provider_type, _ in self._providers: + if provider_type == 'legacy_api_password': + return True + return False + @property def async_auth_providers(self): """Return a list of available auth providers.""" @@ -534,7 +546,7 @@ async def async_load(self): client_id=rt_dict['client_id'], created_at=dt_util.parse_datetime(rt_dict['created_at']), access_token_expiration=timedelta( - rt_dict['access_token_expiration']), + seconds=rt_dict['access_token_expiration']), token=rt_dict['token'], ) refresh_tokens[token.id] = token diff --git a/homeassistant/auth_providers/legacy_api_password.py b/homeassistant/auth_providers/legacy_api_password.py new file mode 100644 index 0000000000000..510cc4d02792f --- /dev/null +++ b/homeassistant/auth_providers/legacy_api_password.py @@ -0,0 +1,104 @@ +""" +Support Legacy API password auth provider. + +It will be removed when auth system production ready +""" +from collections import OrderedDict +import hmac + +import voluptuous as vol + +from homeassistant.exceptions import HomeAssistantError +from homeassistant import auth, data_entry_flow +from homeassistant.core import callback + +USER_SCHEMA = vol.Schema({ + vol.Required('username'): str, +}) + + +CONFIG_SCHEMA = auth.AUTH_PROVIDER_SCHEMA.extend({ +}, extra=vol.PREVENT_EXTRA) + +LEGACY_USER = 'homeassistant' + + +class InvalidAuthError(HomeAssistantError): + """Raised when submitting invalid authentication.""" + + +@auth.AUTH_PROVIDERS.register('legacy_api_password') +class LegacyApiPasswordAuthProvider(auth.AuthProvider): + """Example auth provider based on hardcoded usernames and passwords.""" + + DEFAULT_TITLE = 'Legacy API Password' + + async def async_credential_flow(self): + """Return a flow to login.""" + return LoginFlow(self) + + @callback + def async_validate_login(self, password): + """Helper to validate a username and password.""" + if not hasattr(self.hass, 'http'): + raise ValueError('http component is not loaded') + + if self.hass.http.api_password is None: + raise ValueError('http component is not configured using' + ' api_password') + + if not hmac.compare_digest(self.hass.http.api_password.encode('utf-8'), + password.encode('utf-8')): + raise InvalidAuthError + + async def async_get_or_create_credentials(self, flow_result): + """Return LEGACY_USER always.""" + for credential in await self.async_credentials(): + if credential.data['username'] == LEGACY_USER: + return credential + + return self.async_create_credentials({ + 'username': LEGACY_USER + }) + + async def async_user_meta_for_credentials(self, credentials): + """ + Set name as LEGACY_USER always. + + Will be used to populate info when creating a new user. + """ + return {'name': LEGACY_USER} + + +class LoginFlow(data_entry_flow.FlowHandler): + """Handler for the login flow.""" + + def __init__(self, auth_provider): + """Initialize the login flow.""" + self._auth_provider = auth_provider + + async def async_step_init(self, user_input=None): + """Handle the step of the form.""" + errors = {} + + if user_input is not None: + try: + self._auth_provider.async_validate_login( + user_input['password']) + except InvalidAuthError: + errors['base'] = 'invalid_auth' + + if not errors: + return self.async_create_entry( + title=self._auth_provider.name, + data={} + ) + + schema = OrderedDict() + schema['password'] = str + + return self.async_show_form( + step_id='init', + data_schema=vol.Schema(schema), + errors=errors, + ) diff --git a/homeassistant/components/http/__init__.py b/homeassistant/components/http/__init__.py index 485433434fd0d..37a6805dfb58b 100644 --- a/homeassistant/components/http/__init__.py +++ b/homeassistant/components/http/__init__.py @@ -184,7 +184,22 @@ def __init__(self, hass, api_password, if is_ban_enabled: setup_bans(hass, app, login_threshold) - setup_auth(app, trusted_networks, api_password) + if hass.auth.active: + if hass.auth.support_legacy: + _LOGGER.warning("Experimental auth api enabled and " + "legacy_api_password support enabled. Please " + "use access_token instead api_password, " + "although you can still use legacy " + "api_password") + else: + _LOGGER.warning("Experimental auth api enabled. Please use " + "access_token instead api_password.") + elif api_password is None: + _LOGGER.warning("You have been advised to set http.api_password.") + + setup_auth(app, trusted_networks, hass.auth.active, + support_legacy=hass.auth.support_legacy, + api_password=api_password) if cors_origins: setup_cors(app, cors_origins) diff --git a/homeassistant/components/http/auth.py b/homeassistant/components/http/auth.py index c4723abccee34..a232d9295a4d7 100644 --- a/homeassistant/components/http/auth.py +++ b/homeassistant/components/http/auth.py @@ -17,37 +17,44 @@ @callback -def setup_auth(app, trusted_networks, api_password): +def setup_auth(app, trusted_networks, use_auth, + support_legacy=False, api_password=None): """Create auth middleware for the app.""" @middleware async def auth_middleware(request, handler): """Authenticate as middleware.""" - # If no password set, just always set authenticated=True - if api_password is None: - request[KEY_AUTHENTICATED] = True - return await handler(request) - - # Check authentication authenticated = False - if (HTTP_HEADER_HA_AUTH in request.headers and - hmac.compare_digest( - api_password.encode('utf-8'), - request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))): + if use_auth and (HTTP_HEADER_HA_AUTH in request.headers or + DATA_API_PASSWORD in request.query): + _LOGGER.warning('Please use access_token instead api_password.') + + legacy_auth = (not use_auth or support_legacy) and api_password + if (hdrs.AUTHORIZATION in request.headers and + await async_validate_auth_header( + request, api_password if legacy_auth else None)): + # it included both use_auth and api_password Basic auth + authenticated = True + + elif (legacy_auth and HTTP_HEADER_HA_AUTH in request.headers and + hmac.compare_digest( + api_password.encode('utf-8'), + request.headers[HTTP_HEADER_HA_AUTH].encode('utf-8'))): # A valid auth header has been set authenticated = True - elif (DATA_API_PASSWORD in request.query and + elif (legacy_auth and DATA_API_PASSWORD in request.query and hmac.compare_digest( api_password.encode('utf-8'), request.query[DATA_API_PASSWORD].encode('utf-8'))): authenticated = True - elif (hdrs.AUTHORIZATION in request.headers and - await async_validate_auth_header(api_password, request)): + elif _is_trusted_ip(request, trusted_networks): authenticated = True - elif _is_trusted_ip(request, trusted_networks): + elif not use_auth and api_password is None: + # If neither password nor auth_providers set, + # just always set authenticated=True authenticated = True request[KEY_AUTHENTICATED] = authenticated @@ -76,8 +83,12 @@ def validate_password(request, api_password): request.app['hass'].http.api_password.encode('utf-8')) -async def async_validate_auth_header(api_password, request): - """Test an authorization header if valid password.""" +async def async_validate_auth_header(request, api_password=None): + """ + Test authorization header against access token. + + Basic auth_type is legacy code, should be removed with api_password. + """ if hdrs.AUTHORIZATION not in request.headers: return False @@ -88,7 +99,16 @@ async def async_validate_auth_header(api_password, request): # If no space in authorization header return False - if auth_type == 'Basic': + if auth_type == 'Bearer': + hass = request.app['hass'] + access_token = hass.auth.async_get_access_token(auth_val) + if access_token is None: + return False + + request['hass_user'] = access_token.refresh_token.user + return True + + elif auth_type == 'Basic' and api_password is not None: decoded = base64.b64decode(auth_val).decode('utf-8') try: username, password = decoded.split(':', 1) @@ -102,13 +122,5 @@ async def async_validate_auth_header(api_password, request): return hmac.compare_digest(api_password.encode('utf-8'), password.encode('utf-8')) - if auth_type != 'Bearer': + else: return False - - hass = request.app['hass'] - access_token = hass.auth.async_get_access_token(auth_val) - if access_token is None: - return False - - request['hass_user'] = access_token.refresh_token.user - return True diff --git a/homeassistant/components/websocket_api.py b/homeassistant/components/websocket_api.py index bf472348babf8..c26f68a2c29f0 100644 --- a/homeassistant/components/websocket_api.py +++ b/homeassistant/components/websocket_api.py @@ -315,26 +315,32 @@ def handle_hass_stop(event): authenticated = True else: + self.debug("Request auth") await self.wsock.send_json(auth_required_message()) msg = await wsock.receive_json() msg = AUTH_MESSAGE_SCHEMA(msg) - if 'api_password' in msg: - authenticated = validate_password( - request, msg['api_password']) - - elif 'access_token' in msg: + if self.hass.auth.active and 'access_token' in msg: + self.debug("Received access_token") token = self.hass.auth.async_get_access_token( msg['access_token']) authenticated = token is not None + elif ((not self.hass.auth.active or + self.hass.auth.support_legacy) and + 'api_password' in msg): + self.debug("Received api_password") + authenticated = validate_password( + request, msg['api_password']) + if not authenticated: - self.debug("Invalid password") + self.debug("Authorization failed") await self.wsock.send_json( - auth_invalid_message('Invalid password')) + auth_invalid_message('Invalid access token or password')) await process_wrong_login(request) return wsock + self.debug("Auth OK") await self.wsock.send_json(auth_ok_message()) # ---------- AUTH PHASE OVER ---------- @@ -392,7 +398,7 @@ def handle_hass_stop(event): if wsock.closed: self.debug("Connection closed by client") else: - _LOGGER.exception("Unexpected TypeError: %s", msg) + _LOGGER.exception("Unexpected TypeError: %s", err) except ValueError as err: msg = "Received invalid JSON" @@ -403,7 +409,7 @@ def handle_hass_stop(event): self._writer_task.cancel() except CANCELLATION_ERRORS: - self.debug("Connection cancelled by server") + self.debug("Connection cancelled") except asyncio.QueueFull: self.log_error("Client exceeded max pending messages [1]:", diff --git a/tests/auth_providers/test_legacy_api_password.py b/tests/auth_providers/test_legacy_api_password.py new file mode 100644 index 0000000000000..7a8f17894aa1f --- /dev/null +++ b/tests/auth_providers/test_legacy_api_password.py @@ -0,0 +1,67 @@ +"""Tests for the legacy_api_password auth provider.""" +from unittest.mock import Mock + +import pytest + +from homeassistant import auth +from homeassistant.auth_providers import legacy_api_password + + +@pytest.fixture +def store(hass): + """Mock store.""" + return auth.AuthStore(hass) + + +@pytest.fixture +def provider(hass, store): + """Mock provider.""" + return legacy_api_password.LegacyApiPasswordAuthProvider(hass, store, { + 'type': 'legacy_api_password', + }) + + +async def test_create_new_credential(provider): + """Test that we create a new credential.""" + credentials = await provider.async_get_or_create_credentials({}) + assert credentials.data["username"] is legacy_api_password.LEGACY_USER + assert credentials.is_new is True + + +async def test_only_one_credentials(store, provider): + """Call create twice will return same credential.""" + credentials = await provider.async_get_or_create_credentials({}) + await store.async_get_or_create_user(credentials, provider) + credentials2 = await provider.async_get_or_create_credentials({}) + assert credentials2.data["username"] is legacy_api_password.LEGACY_USER + assert credentials2.id is credentials.id + assert credentials2.is_new is False + + +async def test_verify_not_load(hass, provider): + """Test we raise if http module not load.""" + with pytest.raises(ValueError): + provider.async_validate_login('test-password') + hass.http = Mock(api_password=None) + with pytest.raises(ValueError): + provider.async_validate_login('test-password') + hass.http = Mock(api_password='test-password') + provider.async_validate_login('test-password') + + +async def test_verify_login(hass, provider): + """Test we raise if http module not load.""" + hass.http = Mock(api_password='test-password') + provider.async_validate_login('test-password') + hass.http = Mock(api_password='test-password') + with pytest.raises(legacy_api_password.InvalidAuthError): + provider.async_validate_login('invalid-password') + + +async def test_utf_8_username_password(provider): + """Test that we create a new credential.""" + credentials = await provider.async_get_or_create_credentials({ + 'username': '🎉', + 'password': '😎', + }) + assert credentials.is_new is True diff --git a/tests/components/http/test_auth.py b/tests/components/http/test_auth.py index dd8b2cd35c46c..3e5eed4c924b5 100644 --- a/tests/components/http/test_auth.py +++ b/tests/components/http/test_auth.py @@ -1,20 +1,23 @@ """The tests for the Home Assistant HTTP component.""" # pylint: disable=protected-access from ipaddress import ip_network -from unittest.mock import patch +from unittest.mock import patch, Mock +import pytest from aiohttp import BasicAuth, web from aiohttp.web_exceptions import HTTPUnauthorized -import pytest -from homeassistant.const import HTTP_HEADER_HA_AUTH -from homeassistant.setup import async_setup_component +from homeassistant.auth import AccessToken, RefreshToken from homeassistant.components.http.auth import setup_auth -from homeassistant.components.http.real_ip import setup_real_ip from homeassistant.components.http.const import KEY_AUTHENTICATED - +from homeassistant.components.http.real_ip import setup_real_ip +from homeassistant.const import HTTP_HEADER_HA_AUTH +from homeassistant.setup import async_setup_component from . import mock_real_ip + +ACCESS_TOKEN = 'tk.1234' + API_PASSWORD = 'test1234' # Don't add 127.0.0.1/::1 as trusted, as it may interfere with other test cases @@ -36,15 +39,37 @@ async def mock_handler(request): return web.Response(status=200) +def mock_async_get_access_token(token): + """Return if token is valid.""" + if token == ACCESS_TOKEN: + return Mock(spec=AccessToken, + token=ACCESS_TOKEN, + refresh_token=Mock(spec=RefreshToken)) + else: + return None + + @pytest.fixture def app(): """Fixture to setup a web.Application.""" app = web.Application() + mock_auth = Mock(async_get_access_token=mock_async_get_access_token) + app['hass'] = Mock(auth=mock_auth) app.router.add_get('/', mock_handler) setup_real_ip(app, False, []) return app +@pytest.fixture +def app2(): + """Fixture to setup a web.Application without real_ip middleware.""" + app = web.Application() + mock_auth = Mock(async_get_access_token=mock_async_get_access_token) + app['hass'] = Mock(auth=mock_auth) + app.router.add_get('/', mock_handler) + return app + + async def test_auth_middleware_loaded_by_default(hass): """Test accessing to server from banned IP when feature is off.""" with patch('homeassistant.components.http.setup_auth') as mock_setup: @@ -57,7 +82,7 @@ async def test_auth_middleware_loaded_by_default(hass): async def test_access_without_password(app, aiohttp_client): """Test access without password.""" - setup_auth(app, [], None) + setup_auth(app, [], False, api_password=None) client = await aiohttp_client(app) resp = await client.get('/') @@ -65,8 +90,8 @@ async def test_access_without_password(app, aiohttp_client): async def test_access_with_password_in_header(app, aiohttp_client): - """Test access with password in URL.""" - setup_auth(app, [], API_PASSWORD) + """Test access with password in header.""" + setup_auth(app, [], False, api_password=API_PASSWORD) client = await aiohttp_client(app) req = await client.get( @@ -79,8 +104,8 @@ async def test_access_with_password_in_header(app, aiohttp_client): async def test_access_with_password_in_query(app, aiohttp_client): - """Test access without password.""" - setup_auth(app, [], API_PASSWORD) + """Test access with password in URL.""" + setup_auth(app, [], False, api_password=API_PASSWORD) client = await aiohttp_client(app) resp = await client.get('/', params={ @@ -99,7 +124,7 @@ async def test_access_with_password_in_query(app, aiohttp_client): async def test_basic_auth_works(app, aiohttp_client): """Test access with basic authentication.""" - setup_auth(app, [], API_PASSWORD) + setup_auth(app, [], False, api_password=API_PASSWORD) client = await aiohttp_client(app) req = await client.get( @@ -125,16 +150,64 @@ async def test_basic_auth_works(app, aiohttp_client): assert req.status == 401 -async def test_access_with_trusted_ip(aiohttp_client): +async def test_access_with_trusted_ip(app2, aiohttp_client): """Test access with an untrusted ip address.""" - app = web.Application() - app.router.add_get('/', mock_handler) + setup_auth(app2, TRUSTED_NETWORKS, False, api_password='some-pass') - setup_auth(app, TRUSTED_NETWORKS, 'some-pass') + set_mock_ip = mock_real_ip(app2) + client = await aiohttp_client(app2) - set_mock_ip = mock_real_ip(app) + for remote_addr in UNTRUSTED_ADDRESSES: + set_mock_ip(remote_addr) + resp = await client.get('/') + assert resp.status == 401, \ + "{} shouldn't be trusted".format(remote_addr) + + for remote_addr in TRUSTED_ADDRESSES: + set_mock_ip(remote_addr) + resp = await client.get('/') + assert resp.status == 200, \ + "{} should be trusted".format(remote_addr) + + +async def test_auth_active_access_with_access_token_in_header( + app, aiohttp_client): + """Test access with access token in header.""" + setup_auth(app, [], True, api_password=None) client = await aiohttp_client(app) + req = await client.get( + '/', headers={'Authorization': 'Bearer {}'.format(ACCESS_TOKEN)}) + assert req.status == 200 + + req = await client.get( + '/', headers={'AUTHORIZATION': 'Bearer {}'.format(ACCESS_TOKEN)}) + assert req.status == 200 + + req = await client.get( + '/', headers={'authorization': 'Bearer {}'.format(ACCESS_TOKEN)}) + assert req.status == 200 + + req = await client.get( + '/', headers={'Authorization': ACCESS_TOKEN}) + assert req.status == 401 + + req = await client.get( + '/', headers={'Authorization': 'BEARER {}'.format(ACCESS_TOKEN)}) + assert req.status == 401 + + req = await client.get( + '/', headers={'Authorization': 'Bearer wrong-pass'}) + assert req.status == 401 + + +async def test_auth_active_access_with_trusted_ip(app2, aiohttp_client): + """Test access with an untrusted ip address.""" + setup_auth(app2, TRUSTED_NETWORKS, True, api_password=None) + + set_mock_ip = mock_real_ip(app2) + client = await aiohttp_client(app2) + for remote_addr in UNTRUSTED_ADDRESSES: set_mock_ip(remote_addr) resp = await client.get('/') @@ -146,3 +219,43 @@ async def test_access_with_trusted_ip(aiohttp_client): resp = await client.get('/') assert resp.status == 200, \ "{} should be trusted".format(remote_addr) + + +async def test_auth_active_blocked_api_password_access(app, aiohttp_client): + """Test access using api_password should be blocked when auth.active.""" + setup_auth(app, [], True, api_password=API_PASSWORD) + client = await aiohttp_client(app) + + req = await client.get( + '/', headers={HTTP_HEADER_HA_AUTH: API_PASSWORD}) + assert req.status == 401 + + resp = await client.get('/', params={ + 'api_password': API_PASSWORD + }) + assert resp.status == 401 + + req = await client.get( + '/', + auth=BasicAuth('homeassistant', API_PASSWORD)) + assert req.status == 401 + + +async def test_auth_legacy_support_api_password_access(app, aiohttp_client): + """Test access using api_password if auth.support_legacy.""" + setup_auth(app, [], True, support_legacy=True, api_password=API_PASSWORD) + client = await aiohttp_client(app) + + req = await client.get( + '/', headers={HTTP_HEADER_HA_AUTH: API_PASSWORD}) + assert req.status == 200 + + resp = await client.get('/', params={ + 'api_password': API_PASSWORD + }) + assert resp.status == 200 + + req = await client.get( + '/', + auth=BasicAuth('homeassistant', API_PASSWORD)) + assert req.status == 200 diff --git a/tests/components/test_websocket_api.py b/tests/components/test_websocket_api.py index fbd8584a7d14a..6ea90bcdb88f2 100644 --- a/tests/components/test_websocket_api.py +++ b/tests/components/test_websocket_api.py @@ -77,7 +77,7 @@ def test_auth_via_msg_incorrect_pass(no_auth_websocket_client): assert mock_process_wrong_login.called assert msg['type'] == wapi.TYPE_AUTH_INVALID - assert msg['message'] == 'Invalid password' + assert msg['message'] == 'Invalid access token or password' @asyncio.coroutine @@ -316,47 +316,103 @@ def test_unknown_command(websocket_client): assert msg['error']['code'] == wapi.ERR_UNKNOWN_COMMAND -async def test_auth_with_token(hass, aiohttp_client, hass_access_token): +async def test_auth_active_with_token(hass, aiohttp_client, hass_access_token): """Test authenticating with a token.""" assert await async_setup_component(hass, 'websocket_api', { - 'http': { - 'api_password': API_PASSWORD - } - }) + 'http': { + 'api_password': API_PASSWORD + } + }) client = await aiohttp_client(hass.http.app) async with client.ws_connect(wapi.URL) as ws: - auth_msg = await ws.receive_json() - assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED + with patch('homeassistant.auth.AuthManager.active') as auth_active: + auth_active.return_value = True + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED - await ws.send_json({ - 'type': wapi.TYPE_AUTH, - 'access_token': hass_access_token.token - }) + await ws.send_json({ + 'type': wapi.TYPE_AUTH, + 'access_token': hass_access_token.token + }) - auth_msg = await ws.receive_json() - assert auth_msg['type'] == wapi.TYPE_AUTH_OK + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_OK -async def test_auth_with_invalid_token(hass, aiohttp_client): +async def test_auth_active_with_password_not_allow(hass, aiohttp_client): """Test authenticating with a token.""" assert await async_setup_component(hass, 'websocket_api', { - 'http': { + 'http': { + 'api_password': API_PASSWORD + } + }) + + client = await aiohttp_client(hass.http.app) + + async with client.ws_connect(wapi.URL) as ws: + with patch('homeassistant.auth.AuthManager.active', + return_value=True): + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED + + await ws.send_json({ + 'type': wapi.TYPE_AUTH, 'api_password': API_PASSWORD - } - }) + }) + + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID + + +async def test_auth_legacy_support_with_password(hass, aiohttp_client): + """Test authenticating with a token.""" + assert await async_setup_component(hass, 'websocket_api', { + 'http': { + 'api_password': API_PASSWORD + } + }) client = await aiohttp_client(hass.http.app) async with client.ws_connect(wapi.URL) as ws: - auth_msg = await ws.receive_json() - assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED + with patch('homeassistant.auth.AuthManager.active', + return_value=True),\ + patch('homeassistant.auth.AuthManager.support_legacy', + return_value=True): + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED + + await ws.send_json({ + 'type': wapi.TYPE_AUTH, + 'api_password': API_PASSWORD + }) + + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_OK - await ws.send_json({ - 'type': wapi.TYPE_AUTH, - 'access_token': 'incorrect' - }) - auth_msg = await ws.receive_json() - assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID +async def test_auth_with_invalid_token(hass, aiohttp_client): + """Test authenticating with a token.""" + assert await async_setup_component(hass, 'websocket_api', { + 'http': { + 'api_password': API_PASSWORD + } + }) + + client = await aiohttp_client(hass.http.app) + + async with client.ws_connect(wapi.URL) as ws: + with patch('homeassistant.auth.AuthManager.active') as auth_active: + auth_active.return_value = True + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_REQUIRED + + await ws.send_json({ + 'type': wapi.TYPE_AUTH, + 'access_token': 'incorrect' + }) + + auth_msg = await ws.receive_json() + assert auth_msg['type'] == wapi.TYPE_AUTH_INVALID