diff --git a/lib/pbench/server/api/resources/endpoint_configure.py b/lib/pbench/server/api/resources/endpoint_configure.py index bb60f582d6..355c5a25d6 100644 --- a/lib/pbench/server/api/resources/endpoint_configure.py +++ b/lib/pbench/server/api/resources/endpoint_configure.py @@ -180,7 +180,6 @@ def get(self): try: client = self.server_config.get("openid-connect", "client") realm = self.server_config.get("openid-connect", "realm") - secret = self.server_config.get("openid-connect", "secret") server = self.server_config.get("openid-connect", "server_url") except (NoOptionError, NoSectionError): pass @@ -188,7 +187,6 @@ def get(self): endpoints["openid"] = { "client": client, "realm": realm, - "secret": secret, "server": server, } diff --git a/lib/pbench/server/auth/__init__.py b/lib/pbench/server/auth/__init__.py index 9e833ea961..878c9bd234 100644 --- a/lib/pbench/server/auth/__init__.py +++ b/lib/pbench/server/auth/__init__.py @@ -305,24 +305,23 @@ def construct_oidc_client(cls, server_config: PbenchServerConfig) -> "OpenIDClie server_url = server_config.get("openid-connect", "server_url") client = server_config.get("openid-connect", "client") realm = server_config.get("openid-connect", "realm") - secret = server_config.get("openid-connect", "secret") except (NoOptionError, NoSectionError) as exc: raise OpenIDClient.NotConfigured() from exc - return cls( + oidc_client = cls( server_url=server_url, client_id=client, realm_name=realm, - client_secret_key=secret, verify=False, ) + oidc_client.set_oidc_public_key() + return oidc_client def __init__( self, server_url: str, client_id: str, realm_name: str, - client_secret_key: str, verify: bool = True, headers: Optional[Dict[str, str]] = None, ): @@ -341,16 +340,24 @@ def __init__( server_url : OpenID Connect server auth url client_id : client id realm_name : realm name - client_secret_key : client secret key verify : True if require valid SSL headers : dict of custom header to pass to each HTML request """ self.client_id = client_id - self._client_secret_key = client_secret_key self._realm_name = realm_name self._connection = Connection(server_url, headers, verify) + self._pem_public_key = None + + def __repr__(self): + return ( + f"OpenIDClient(server_url={self._connection.server_url}, " + f"client_id={self.client_id}, realm_name={self._realm_name}, " + f"headers={self._connection.headers})" + ) + + def set_oidc_public_key(self): realm_public_key_uri = f"realms/{self._realm_name}" response_json = self._connection.get(realm_public_key_uri).json() public_key = response_json["public_key"] @@ -362,13 +369,6 @@ def __init__( pem_public_key += "-----END PUBLIC KEY-----\n" self._pem_public_key = pem_public_key - def __repr__(self): - return ( - f"OpenIDClient(server_url={self._connection.server_url}, " - f"client_id={self.client_id}, realm_name={self._realm_name}, " - f"headers={self._connection.headers})" - ) - def token_introspect(self, token: str) -> JSON: """Utility method to decode access/Id tokens using the public key provided by the identity provider. diff --git a/lib/pbench/test/unit/server/auth/test_auth.py b/lib/pbench/test/unit/server/auth/test_auth.py index 27558c88e5..2aadca49d3 100644 --- a/lib/pbench/test/unit/server/auth/test_auth.py +++ b/lib/pbench/test/unit/server/auth/test_auth.py @@ -3,8 +3,6 @@ from http import HTTPStatus from typing import Dict, Optional, Tuple, Union -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from flask import current_app, Flask import jwt import pytest @@ -252,33 +250,6 @@ def test_create_w_roles(self): assert user.roles == ["roleA", "roleB"] -@pytest.fixture(scope="session") -def rsa_keys() -> Dict[str, Union[rsa.RSAPrivateKey, str]]: - """Fixture for generating an RSA public / private key pair. - - Returns: - A dictionary containing the RSAPrivateKey object, the PEM encoded public - key string without the BEGIN/END bookends (mimicing what is returned by - an OpenID Connect broker), and the PEM encoded public key string with - the BEGIN/END bookends - """ - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - pem_public_key = ( - private_key.public_key() - .public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) - .decode() - ) - # Strip "-----BEGIN..." and "-----END...", and the empty element resulting - # from the trailing newline character. - public_key_l = pem_public_key.split("\n")[1:-2] - public_key = "".join(public_key_l) - return { - "private_key": private_key, - "public_key": public_key, - "pem_public_key": pem_public_key, - } - - def gen_rsa_token( audience: str, private_key: str, exp: str = "99999999999" ) -> Tuple[str, Dict[str, str]]: @@ -397,7 +368,6 @@ def test_construct_oidc_client_succ(self, monkeypatch): config = mock_connection(monkeypatch, client_id, public_key) server_url = config["openid-connect"]["server_url"] realm_name = config["openid-connect"]["realm"] - secret = config["openid-connect"]["secret"] oidc_client = OpenIDClient.construct_oidc_client(config) @@ -406,7 +376,6 @@ def test_construct_oidc_client_succ(self, monkeypatch): f"client_id={client_id}, realm_name={realm_name}, " "headers={})" ) - assert oidc_client._client_secret_key == secret assert ( oidc_client._pem_public_key == f"-----BEGIN PUBLIC KEY-----\n{public_key}\n-----END PUBLIC KEY-----\n" @@ -625,22 +594,6 @@ def delete(*args, **kwargs): user = Auth.verify_auth(pbench_drb_token_invalid) assert user is None - def test_verify_auth_internal_at_valid_fail( - self, monkeypatch, make_logger, pbench_drb_token - ): - """Verify behavior when a token is not in the database""" - - def query(*args, **kwargs): - return None - - monkeypatch.setattr(Auth.AuthToken, "query", query) - app = Flask("test-verify-auth-internal-at-valid-fail") - app.logger = make_logger - with app.app_context(): - current_app.secret_key = jwt_secret - user = Auth.verify_auth(pbench_drb_token) - assert user is None - def test_verify_auth_oidc_offline(self, monkeypatch, rsa_keys, make_logger): """Verify OIDC token offline verification success path""" client_id = "us" diff --git a/lib/pbench/test/unit/server/conftest.py b/lib/pbench/test/unit/server/conftest.py index da03cf88cc..8f132f1531 100644 --- a/lib/pbench/test/unit/server/conftest.py +++ b/lib/pbench/test/unit/server/conftest.py @@ -10,12 +10,16 @@ from stat import ST_MTIME import tarfile from typing import Dict, Optional +from urllib.parse import urljoin import uuid +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from freezegun import freeze_time import jwt import pytest from requests import Response +import responses from pbench.common import MetadataLog from pbench.common.logger import get_pbench_logger @@ -44,6 +48,9 @@ [flask-app] secret-key = my_precious +[openid-connect] +server_url = http://openid.example.com + [logging] logger_type = null # We run with DEBUG level logging during the server unit tests to help @@ -120,12 +127,62 @@ def server_config(on_disk_server_config) -> PbenchServerConfig: return server_config +@pytest.fixture(scope="session") +def rsa_keys(): + """Fixture for generating an RSA public / private key pair. + + Returns: + A dictionary containing the RSAPrivateKey object, the PEM encoded public + key string without the BEGIN/END bookends (mimicing what is returned by + an OpenID Connect broker), and the PEM encoded public key string with + the BEGIN/END bookends + """ + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pem_public_key = ( + private_key.public_key() + .public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) + .decode() + ) + # Strip "-----BEGIN..." and "-----END...", and the empty element resulting + # from the trailing newline character. + public_key_l = pem_public_key.split("\n")[1:-2] + public_key = "".join(public_key_l) + return { + "private_key": private_key, + "public_key": public_key, + "pem_public_key": pem_public_key, + } + + +@pytest.fixture(scope="session") +def add_auth_connection_mock(server_config, rsa_keys): + """ + Mocks the OIDC public key GET Requests call on the realm uri. + Args: + server_config: Server_config fixture + rsa_keys: rsa_keys fixture to get te public key + """ + with responses.RequestsMock() as mock: + oidc_server = server_config.get("openid-connect", "server_url") + oidc_realm = server_config.get("openid-connect", "realm") + url = urljoin(oidc_server, f"realms/{oidc_realm}") + + mock.add( + responses.GET, + url, + status=HTTPStatus.OK, + json={"public_key": rsa_keys["public_key"]}, + ) + yield mock + + @pytest.fixture() -def client(monkeypatch, server_config): +def client(monkeypatch, server_config, add_auth_connection_mock): """A test client for the app. Fixtures: server_config: Set up a pbench-server.cfg configuration + add_auth_connection_mock: set up a mock OIDC connection NOTE: The Flask app initialization includes setting up the SQLAlchemy DB. For test cases that require the DB but not a full Flask app context, use @@ -725,25 +782,42 @@ def fake_find(name: str) -> Optional[Template]: @pytest.fixture() -def pbench_admin_token(client, create_admin_user): - """Internal valid token for the 'ADMIN' user""" - return generate_token(user=create_admin_user, username=admin_username) +def pbench_admin_token(client, server_config, create_admin_user, rsa_keys): + """OIDC valid token for the 'ADMIN' user""" + return generate_token( + user=create_admin_user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + username=admin_username, + pbench_client_roles=["ADMIN"], + ) @pytest.fixture() -def pbench_drb_token(client, create_drb_user): - """Internal valid token for the 'drb' user""" - return generate_token(username="drb", user=create_drb_user) +def pbench_drb_token(client, server_config, create_drb_user, rsa_keys): + """OIDC valid token for the 'drb' user""" + return generate_token( + username="drb", + client_id=server_config.get("openid-connect", "client"), + private_key=rsa_keys["private_key"], + user=create_drb_user, + ) @pytest.fixture() -def pbench_drb_token_invalid(client, create_drb_user): - """Internal invalid token for the 'drb' user""" - return generate_token(username="drb", user=create_drb_user, valid=False) +def pbench_drb_token_invalid(client, server_config, create_drb_user, rsa_keys): + """OIDC invalid token for the 'drb' user""" + return generate_token( + username="drb", + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + user=create_drb_user, + valid=False, + ) @pytest.fixture() -def get_token_func(pbench_admin_token): +def get_token_func(pbench_admin_token, server_config, rsa_keys): """Get the token function for fetching the token for a user This fixture yields a function value which can be called to get the internal @@ -754,23 +828,40 @@ def get_token_func(pbench_admin_token): generated. """ return lambda user: ( - pbench_admin_token if user == admin_username else generate_token(username=user) + pbench_admin_token + if user == admin_username + else generate_token( + username=user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + ) ) def generate_token( username: str, + private_key: str, + client_id: str, user: Optional[User] = None, + pbench_client_roles: Optional[list[str]] = None, valid: bool = True, ) -> str: - """Generates an internal JWT token that mimics a real internal token - obtained from an internal user login. + """Generates an OIDC JWT token that mimics a real OIDC token + obtained from the user login. + + Note: The OIDC client id passed as an argument has to match with the + oidc client id from the default config file. Otherwise the token + validation will fail in the server code. Args: - username : username to include in the token payload - user : user attributes will be extracted from the user object to include + username: username to include in the token payload + private_key: RS256 private key to encode the jwt token + client_id: OIDC client id to include in the encoded string. + user: user attributes will be extracted from the user object to include in the token payload. - valid : If True, the generated token will be valid for 10 mins. + pbench_client_roles: Any OIDC client specifc roles we want to include + in the token. + valid: If True, the generated token will be valid for 10 mins. If False, generated token would be invalid and expired Returns: @@ -788,9 +879,33 @@ def generate_token( "iat": current_utc, "exp": exp, "sub": user.id, + "aud": client_id, + "azp": client_id, + "realm_access": { + "roles": [ + "default-roles-pbench-server", + "offline_access", + "uma_authorization", + ] + }, + "resource_access": { + "broker": {"roles": ["read-token"]}, + "account": { + "roles": ["manage-account", "manage-account-links", "view-profile"] + }, + }, + "scope": "openid profile email", + "sid": "1988612e-774d-43b8-8d4a-bbc05ee55edb", + "email_verified": True, + "name": user.first_name + " " + user.last_name, + "preferred_username": username, + "given_name": user.first_name, + "family_name": user.last_name, + "email": user.email, } - token_str = jwt.encode(payload, jwt_secret, algorithm="HS256") - user.add_token(AuthToken(token=token_str, expiration=exp)) + if pbench_client_roles: + payload["resource_access"].update({client_id: {"roles": pbench_client_roles}}) + token_str = jwt.encode(payload, private_key, algorithm="RS256") return token_str diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py b/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py index 399491ec76..28e5764d37 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets_detail.py @@ -48,6 +48,7 @@ def test_query( query_api, find_template, pbench_admin_token, + rsa_keys, user, expected_status, ): @@ -62,7 +63,11 @@ def test_query( if user == "test_admin": token = pbench_admin_token else: - token = generate_token(username=user) + token = generate_token( + username=user, + private_key=rsa_keys["private_key"], + client_id=server_config.get("openid-connect", "client"), + ) assert token headers = {"authorization": f"bearer {token}"} diff --git a/lib/pbench/test/unit/server/test_api_base.py b/lib/pbench/test/unit/server/test_api_base.py index 71515eaa2f..ba1c891771 100644 --- a/lib/pbench/test/unit/server/test_api_base.py +++ b/lib/pbench/test/unit/server/test_api_base.py @@ -79,7 +79,9 @@ def options(self, **kwargs) -> Response: class TestApiBase: """Verify internal methods of the API base class.""" - def test_method_validation(self, server_config, make_logger, monkeypatch): + def test_method_validation( + self, server_config, make_logger, monkeypatch, add_auth_connection_mock + ): # Create the temporary flask application. app = Flask("test-api-server") app.debug = True diff --git a/lib/pbench/test/unit/server/test_datasets_daterange.py b/lib/pbench/test/unit/server/test_datasets_daterange.py index fb20a11aba..af9ae7faa6 100644 --- a/lib/pbench/test/unit/server/test_datasets_daterange.py +++ b/lib/pbench/test/unit/server/test_datasets_daterange.py @@ -18,7 +18,13 @@ class TestDatasetsDateRange: @pytest.fixture() def query_as( - self, client, server_config, more_datasets, provide_metadata, get_token_func + self, + client, + server_config, + more_datasets, + provide_metadata, + get_token_func, + add_auth_connection_mock, ): """ Helper fixture to perform the API query and validate an expected diff --git a/lib/pbench/test/unit/server/test_endpoint_configure.py b/lib/pbench/test/unit/server/test_endpoint_configure.py index 123c47c471..a03ad32e53 100644 --- a/lib/pbench/test/unit/server/test_endpoint_configure.py +++ b/lib/pbench/test/unit/server/test_endpoint_configure.py @@ -136,15 +136,13 @@ def check_config(self, client, server_config, host, my_headers={}): try: oidc_client = server_config.get("openid-connect", "client") oidc_realm = server_config.get("openid-connect", "realm") - oidc_secret = server_config.get("openid-connect", "secret") oidc_server = server_config.get("openid-connect", "server_url") except (NoOptionError, NoSectionError): pass else: - expected_results["openid-connect"] = { + expected_results["openid"] = { "client": oidc_client, "realm": oidc_realm, - "secret": oidc_secret, "server": oidc_server, } diff --git a/lib/pbench/test/unit/server/test_shell_cli.py b/lib/pbench/test/unit/server/test_shell_cli.py index 12f56ec45d..9ac5d67f00 100644 --- a/lib/pbench/test/unit/server/test_shell_cli.py +++ b/lib/pbench/test/unit/server/test_shell_cli.py @@ -268,9 +268,17 @@ def test_main_crontab_failed(monkeypatch, make_logger, mock_get_server_config): def immediate_success(*args, **kwargs): pass + def wait_for_oidc_server( + server_config: PbenchServerConfig, logger: logging.Logger + ) -> str: + return "https://oidc.example.com" + def generate_crontab_if_necessary(*args, **kwargs) -> int: return 43 + monkeypatch.setattr( + shell.OpenIDClient, "wait_for_oidc_server", wait_for_oidc_server + ) monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_indexing", immediate_success) @@ -299,6 +307,11 @@ def init_indexing(*args, **kwargs) -> int: called[0] = True raise init_indexing_exc + monkeypatch.setattr( + shell.OpenIDClient, + "wait_for_oidc_server", + lambda config, logger: "https://oidc.example.com", + ) monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_indexing", init_indexing) @@ -325,6 +338,12 @@ def init_db(*args, **kwargs) -> int: called[0] = True raise init_db_exc + monkeypatch.setattr( + shell.OpenIDClient, + "wait_for_oidc_server", + lambda config, logger: "https://oidc.example.com", + ) + monkeypatch.setattr(shell.site, "ENABLE_USER_SITE", False) monkeypatch.setattr(shell, "wait_for_uri", immediate_success) monkeypatch.setattr(shell, "init_db", init_db) diff --git a/lib/pbench/test/unit/server/test_user_auth.py b/lib/pbench/test/unit/server/test_user_auth.py index 18309c3e15..e69de29bb2 100644 --- a/lib/pbench/test/unit/server/test_user_auth.py +++ b/lib/pbench/test/unit/server/test_user_auth.py @@ -1,539 +0,0 @@ -from datetime import datetime, timedelta, timezone -from http import HTTPStatus -import time - -from pbench.server.database.database import Database -from pbench.server.database.models.auth_tokens import AuthToken -from pbench.server.database.models.users import User -from pbench.test.unit.server.conftest import admin_username - - -def register_user( - client, server_config, email, username, password, firstname, lastname -): - """ - Helper function to register a user using register API - """ - return client.post( - f"{server_config.rest_uri}/register", - json={ - "email": email, - "password": password, - "username": username, - "first_name": firstname, - "last_name": lastname, - }, - ) - - -def login_user(client, server_config, username, password, token_expiry=None): - """ - Helper function to generate a user authentication token - """ - return client.post( - f"{server_config.rest_uri}/login", - json={"username": username, "password": password, "token_expiry": token_expiry}, - ) - - -class TestUserAuthentication: - @staticmethod - def test_registration(client, server_config, tmp_path): - client.config["SESSION_FILE_DIR"] = tmp_path - """ Test for user registration """ - with client: - response = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert response.content_type == "application/json" - assert response.status_code, HTTPStatus.CREATED - - @staticmethod - def test_registration_missing_fields(client, server_config): - """Test for user registration missing fields""" - with client: - response = client.post( - f"{server_config.rest_uri}/register", - json={ - "email": "user@domain.com", - "password": "12345", - "username": "user", - }, - ) - data = response.json - assert data["message"] == "Missing first_name field" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.BAD_REQUEST - - @staticmethod - def test_registration_with_registered_user(client, server_config): - """Test registration with already registered email""" - user = User( - email="user@domain.com", - password="12345", - username="user", - first_name="firstname", - last_name="lastname", - ) - Database.db_session.add(user) - Database.db_session.commit() - with client: - response = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - data = response.json - assert data["message"] == "Provided username is already in use." - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.FORBIDDEN - - @staticmethod - def test_user_login(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - # registered user login - response = login_user(client, server_config, "user", "12345") - data = response.json - assert data["auth_token"] - assert data["username"] == "user" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_user_relogin(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # registered user login - response = login_user(client, server_config, "user", "12345") - data = response.json - assert data["auth_token"] - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.OK - first_auth_token = data["auth_token"] - - # Re-login with auth header - time.sleep(1) - response = client.post( - f"{server_config.rest_uri}/login", - headers=dict(Authorization=f"Bearer {first_auth_token}"), - json={"username": "user", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - second_auth_token = response.json["auth_token"] - assert ( - first_auth_token != second_auth_token - ), "Second login returned first token" - - # Re-login without auth header - time.sleep(1) - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "user", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - third_auth_token = response.json["auth_token"] - assert ( - first_auth_token != third_auth_token - ), "Third login returned first token" - assert ( - second_auth_token != third_auth_token - ), "Third login returned second token" - - @staticmethod - def test_login_removes_expired_tokens(client, server_config): - """Test to ensure on a new login previously expired tokens are - removed.""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="me", - firstname="first name", - lastname="last name", - email="me@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - user = User.query(username="me") - # Add 3 expired tokens, values are in "days" old - for age in (1, 2, 4): - token = AuthToken( - token=f"token-that-is-{age:d}-days-old", - expiration=datetime.now(timezone.utc) - timedelta(days=age), - ) - user.add_token(token) - - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "me", "password": "12345"}, - ) - assert response.status_code == HTTPStatus.OK - - for age in (1, 2, 4): - token = AuthToken.query(f"token-that-is-{age:d}-days-old") - assert token is None - - @staticmethod - def test_user_login_with_wrong_password(client, server_config): - """Test for login of registered-user login""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # registered user login - response = login_user(client, server_config, "user", "123456") - data = response.json - assert data["message"] == "Bad login" - assert response.content_type == "application/json" - assert response.status_code == HTTPStatus.UNAUTHORIZED - - @staticmethod - def test_login_without_password(client, server_config): - """Test for login of non-registered user""" - with client: - response = client.post( - f"{server_config.rest_uri}/login", - json={"username": "username"}, - ) - data = response.json - assert data["message"] == "Please provide a valid password" - assert response.status_code == HTTPStatus.BAD_REQUEST - - @staticmethod - def test_non_registered_user_login(client, server_config): - """Test for login of non-registered user""" - with client: - response = login_user(client, server_config, "username", "12345") - data = response.json - assert data["message"] == "Bad login" - assert response.status_code == HTTPStatus.UNAUTHORIZED - - @staticmethod - def test_get_user(client, server_config): - """Test for get user api""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - response = client.get( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - data = response.json - assert response.status_code == HTTPStatus.OK - assert data is not None - assert data["email"] == "user@domain.com" - assert data["username"] == "username" - assert data["first_name"] == "firstname" - - @staticmethod - def test_update_user(client, server_config): - """Test for get user api""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - - new_registration_time = datetime.now() - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"registered_on": new_registration_time, "first_name": "newname"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - data = response.json - assert data["message"] == "Invalid update request payload" - - # Test password update - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"password": "newpass"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - data = response.json - assert response.status_code == HTTPStatus.OK - assert data["first_name"] == "firstname" - assert data["email"] == "user@domain.com" - time.sleep(1) - response = login_user(client, server_config, "username", "newpass") - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_external_token_update(client, server_config): - """Test for external attempt at updating auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = login_user(client, server_config, "username", "12345") - assert response.status_code == HTTPStatus.OK - data_login = response.json - - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"auth_tokens": "external_auth_token"}, - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.BAD_REQUEST - data = response.json - assert data["message"] == "Invalid fields in update request payload" - - @staticmethod - def test_malformed_auth_token(client, server_config): - """Test for user status for malformed auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - response = client.get( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer" + "malformed"), - ) - data = response.json - assert data is None - - @staticmethod - def test_valid_logout(client, server_config): - """Test for logout before token expires""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="user", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "user", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - # valid token logout - response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - # Check if the token has been successfully removed from the database - assert ( - not Database.db_session.query(AuthToken) - .filter_by(token=data_login["auth_token"]) - .first() - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_invalid_logout(client, server_config): - """Testing logout after the token expires""" - with client: - # user registration - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert resp_login.status_code == HTTPStatus.OK - assert data_login["auth_token"] - - # log out with the current token - logout_response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert logout_response.status_code == HTTPStatus.OK - - # Logout using invalid token - # Expect 200 on response, since the invalid token can not be used anymore - response = client.post( - f"{server_config.rest_uri}/logout", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_delete_user(client, server_config): - """Test for user status for malformed auth token""" - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - - response = client.delete( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_non_existent_user_delete(client, server_config): - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # user login - resp_login = login_user(client, server_config, "username", "12345") - data_login = resp_login.json - assert data_login["auth_token"] - - response = client.delete( - f"{server_config.rest_uri}/user/username1", - headers=dict(Authorization="Bearer " + data_login["auth_token"]), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - assert response.json["message"] == "Not authorized to access user username1" - - @staticmethod - def test_admin_access(client, server_config, pbench_admin_token): - with client: - resp_register = register_user( - client, - server_config, - username="username", - firstname="firstname", - lastname="lastName", - email="user@domain.com", - password="12345", - ) - assert resp_register.status_code == HTTPStatus.CREATED - - # Update user with admin credentials - response = client.put( - f"{server_config.rest_uri}/user/username", - json={"first_name": "newname"}, - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.OK - - # Delete user with admin credentials - response = client.delete( - f"{server_config.rest_uri}/user/username", - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.OK - - @staticmethod - def test_admin_delete(client, server_config, pbench_admin_token): - # Delete admin user with admin credentials - # We should not be able to delete an admin user - response = client.delete( - f"{server_config.rest_uri}/user/{admin_username}", - headers=dict(Authorization="Bearer " + pbench_admin_token), - ) - assert response.status_code == HTTPStatus.FORBIDDEN - assert response.json["message"] == "Not authorized to delete user" diff --git a/server/lib/config/pbench-server-default.cfg b/server/lib/config/pbench-server-default.cfg index e5eed70514..f64cf8a756 100644 --- a/server/lib/config/pbench-server-default.cfg +++ b/server/lib/config/pbench-server-default.cfg @@ -101,7 +101,7 @@ wait_timeout = 120 realm = pbench-server # Client entity name requesting OIDC to authenticate a user. -client = pbench-server-client +client = pbench-dashboard [logging] logger_type = devlog