-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
By default to use access_token if hass.auth.active (#15212)
* Force to use access_token if hass.auth.active * Not allow Basic auth with api_password if hass.auth.active * Block websocket api_password auth when hass.auth.active * Add legacy_api_password auth provider * lint * lint
- Loading branch information
Showing
8 changed files
with
466 additions
and
81 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
""" | ||
Support Legacy API password auth provider. | ||
It will be removed when auth system production ready | ||
""" | ||
from collections import OrderedDict | ||
import hmac | ||
|
||
import voluptuous as vol | ||
|
||
from homeassistant.exceptions import HomeAssistantError | ||
from homeassistant import auth, data_entry_flow | ||
from homeassistant.core import callback | ||
|
||
USER_SCHEMA = vol.Schema({ | ||
vol.Required('username'): str, | ||
}) | ||
|
||
|
||
CONFIG_SCHEMA = auth.AUTH_PROVIDER_SCHEMA.extend({ | ||
}, extra=vol.PREVENT_EXTRA) | ||
|
||
LEGACY_USER = 'homeassistant' | ||
|
||
|
||
class InvalidAuthError(HomeAssistantError): | ||
"""Raised when submitting invalid authentication.""" | ||
|
||
|
||
@auth.AUTH_PROVIDERS.register('legacy_api_password') | ||
class LegacyApiPasswordAuthProvider(auth.AuthProvider): | ||
"""Example auth provider based on hardcoded usernames and passwords.""" | ||
|
||
DEFAULT_TITLE = 'Legacy API Password' | ||
|
||
async def async_credential_flow(self): | ||
"""Return a flow to login.""" | ||
return LoginFlow(self) | ||
|
||
@callback | ||
def async_validate_login(self, password): | ||
"""Helper to validate a username and password.""" | ||
if not hasattr(self.hass, 'http'): | ||
raise ValueError('http component is not loaded') | ||
|
||
if self.hass.http.api_password is None: | ||
raise ValueError('http component is not configured using' | ||
' api_password') | ||
|
||
if not hmac.compare_digest(self.hass.http.api_password.encode('utf-8'), | ||
password.encode('utf-8')): | ||
raise InvalidAuthError | ||
|
||
async def async_get_or_create_credentials(self, flow_result): | ||
"""Return LEGACY_USER always.""" | ||
for credential in await self.async_credentials(): | ||
if credential.data['username'] == LEGACY_USER: | ||
return credential | ||
|
||
return self.async_create_credentials({ | ||
'username': LEGACY_USER | ||
}) | ||
|
||
async def async_user_meta_for_credentials(self, credentials): | ||
""" | ||
Set name as LEGACY_USER always. | ||
Will be used to populate info when creating a new user. | ||
""" | ||
return {'name': LEGACY_USER} | ||
|
||
|
||
class LoginFlow(data_entry_flow.FlowHandler): | ||
"""Handler for the login flow.""" | ||
|
||
def __init__(self, auth_provider): | ||
"""Initialize the login flow.""" | ||
self._auth_provider = auth_provider | ||
|
||
async def async_step_init(self, user_input=None): | ||
"""Handle the step of the form.""" | ||
errors = {} | ||
|
||
if user_input is not None: | ||
try: | ||
self._auth_provider.async_validate_login( | ||
user_input['password']) | ||
except InvalidAuthError: | ||
errors['base'] = 'invalid_auth' | ||
|
||
if not errors: | ||
return self.async_create_entry( | ||
title=self._auth_provider.name, | ||
data={} | ||
) | ||
|
||
schema = OrderedDict() | ||
schema['password'] = str | ||
|
||
return self.async_show_form( | ||
step_id='init', | ||
data_schema=vol.Schema(schema), | ||
errors=errors, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
"""Tests for the legacy_api_password auth provider.""" | ||
from unittest.mock import Mock | ||
|
||
import pytest | ||
|
||
from homeassistant import auth | ||
from homeassistant.auth_providers import legacy_api_password | ||
|
||
|
||
@pytest.fixture | ||
def store(hass): | ||
"""Mock store.""" | ||
return auth.AuthStore(hass) | ||
|
||
|
||
@pytest.fixture | ||
def provider(hass, store): | ||
"""Mock provider.""" | ||
return legacy_api_password.LegacyApiPasswordAuthProvider(hass, store, { | ||
'type': 'legacy_api_password', | ||
}) | ||
|
||
|
||
async def test_create_new_credential(provider): | ||
"""Test that we create a new credential.""" | ||
credentials = await provider.async_get_or_create_credentials({}) | ||
assert credentials.data["username"] is legacy_api_password.LEGACY_USER | ||
assert credentials.is_new is True | ||
|
||
|
||
async def test_only_one_credentials(store, provider): | ||
"""Call create twice will return same credential.""" | ||
credentials = await provider.async_get_or_create_credentials({}) | ||
await store.async_get_or_create_user(credentials, provider) | ||
credentials2 = await provider.async_get_or_create_credentials({}) | ||
assert credentials2.data["username"] is legacy_api_password.LEGACY_USER | ||
assert credentials2.id is credentials.id | ||
assert credentials2.is_new is False | ||
|
||
|
||
async def test_verify_not_load(hass, provider): | ||
"""Test we raise if http module not load.""" | ||
with pytest.raises(ValueError): | ||
provider.async_validate_login('test-password') | ||
hass.http = Mock(api_password=None) | ||
with pytest.raises(ValueError): | ||
provider.async_validate_login('test-password') | ||
hass.http = Mock(api_password='test-password') | ||
provider.async_validate_login('test-password') | ||
|
||
|
||
async def test_verify_login(hass, provider): | ||
"""Test we raise if http module not load.""" | ||
hass.http = Mock(api_password='test-password') | ||
provider.async_validate_login('test-password') | ||
hass.http = Mock(api_password='test-password') | ||
with pytest.raises(legacy_api_password.InvalidAuthError): | ||
provider.async_validate_login('invalid-password') | ||
|
||
|
||
async def test_utf_8_username_password(provider): | ||
"""Test that we create a new credential.""" | ||
credentials = await provider.async_get_or_create_credentials({ | ||
'username': '🎉', | ||
'password': '😎', | ||
}) | ||
assert credentials.is_new is True |
Oops, something went wrong.