-
Notifications
You must be signed in to change notification settings - Fork 108
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use of oidc configuration in the server unit test as default
- Loading branch information
Showing
11 changed files
with
162 additions
and
596 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,13 +10,17 @@ | |
from stat import ST_MTIME | ||
import tarfile | ||
from typing import Dict, Optional | ||
from urllib.parse import urljoin | ||
import uuid | ||
|
||
from cryptography.hazmat.primitives.asymmetric import rsa | ||
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat | ||
from email_validator import EmailNotValidError, ValidatedEmail | ||
from freezegun import freeze_time | ||
import jwt | ||
import pytest | ||
from requests import Response | ||
import responses | ||
|
||
from pbench.common import MetadataLog | ||
from pbench.common.logger import get_pbench_logger | ||
|
@@ -25,7 +29,6 @@ | |
import pbench.server.auth.auth as Auth | ||
from pbench.server.database import init_db | ||
from pbench.server.database.database import Database | ||
from pbench.server.database.models.active_tokens import ActiveTokens | ||
from pbench.server.database.models.datasets import Dataset, Metadata | ||
from pbench.server.database.models.template import Template | ||
from pbench.server.database.models.users import User | ||
|
@@ -49,6 +52,9 @@ | |
host = elasticsearch.example.com | ||
port = 7080 | ||
[openid-connect] | ||
server_url = http://openid.example.com | ||
[logging] | ||
logger_type = null | ||
# We run with DEBUG level logging during the server unit tests to help | ||
|
@@ -69,6 +75,9 @@ | |
admin_email = "[email protected]" | ||
generic_password = "12345" | ||
jwt_secret = "my_precious" | ||
# oidc public client name needs to match with the one coming | ||
# from the config file | ||
oidc_public_client = "pbench-dashboard" | ||
|
||
|
||
def do_setup(tmp_d: Path) -> Path: | ||
|
@@ -124,8 +133,57 @@ def server_config(on_disk_server_config) -> PbenchServerConfig: | |
return server_config | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def rsa_keys(): | ||
"""Fixture for generating an RSA public / private key pair. | ||
Returns: | ||
A dictionary containing the RSAPrivateKey object, the PEM encoded public | ||
key string without the BEGIN/END bookends (mimicing what is returned by | ||
an OpenID Connect broker), and the PEM encoded public key string with | ||
the BEGIN/END bookends | ||
""" | ||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) | ||
pem_public_key = ( | ||
private_key.public_key() | ||
.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) | ||
.decode() | ||
) | ||
# Strip "-----BEGIN..." and "-----END...", and the empty element resulting | ||
# from the trailing newline character. | ||
public_key_l = pem_public_key.split("\n")[1:-2] | ||
public_key = "".join(public_key_l) | ||
return { | ||
"private_key": private_key, | ||
"public_key": public_key, | ||
"pem_public_key": pem_public_key, | ||
} | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def add_auth_connection_mock(server_config, rsa_keys): | ||
""" | ||
Mocks the OIDC public key GET Requests call on the realm uri. | ||
Args: | ||
server_config: Server_config fixture | ||
rsa_keys: rsa_keys fixture to get te public key | ||
""" | ||
with responses.RequestsMock() as mock: | ||
oidc_server = server_config.get("openid-connect", "server_url") | ||
oidc_realm = server_config.get("openid-connect", "realm") | ||
url = urljoin(oidc_server, f"realms/{oidc_realm}") | ||
|
||
mock.add( | ||
responses.GET, | ||
url, | ||
status=HTTPStatus.OK, | ||
json={"public_key": rsa_keys["public_key"]}, | ||
) | ||
yield mock | ||
|
||
|
||
@pytest.fixture() | ||
def client(monkeypatch, server_config, fake_email_validator): | ||
def client(monkeypatch, server_config, fake_email_validator, add_auth_connection_mock): | ||
"""A test client for the app. | ||
Fixtures: | ||
|
@@ -760,25 +818,37 @@ def fake_find(name: str) -> Optional[Template]: | |
|
||
|
||
@pytest.fixture() | ||
def pbench_admin_token(client, create_admin_user): | ||
"""Internal valid token for the 'ADMIN' user""" | ||
return generate_token(user=create_admin_user, username=admin_username) | ||
def pbench_admin_token(client, create_admin_user, rsa_keys): | ||
"""OIDC valid token for the 'ADMIN' user""" | ||
return generate_token( | ||
user=create_admin_user, | ||
private_key=rsa_keys["private_key"], | ||
username=admin_username, | ||
pbench_client_roles=["ADMIN"], | ||
) | ||
|
||
|
||
@pytest.fixture() | ||
def pbench_drb_token(client, create_drb_user): | ||
"""Internal valid token for the 'drb' user""" | ||
return generate_token(username="drb", user=create_drb_user) | ||
def pbench_drb_token(client, create_drb_user, rsa_keys): | ||
"""OIDC valid token for the 'drb' user""" | ||
return generate_token( | ||
username="drb", private_key=rsa_keys["private_key"], user=create_drb_user | ||
) | ||
|
||
|
||
@pytest.fixture() | ||
def pbench_drb_token_invalid(client, create_drb_user): | ||
"""Internal invalid token for the 'drb' user""" | ||
return generate_token(username="drb", user=create_drb_user, valid=False) | ||
def pbench_drb_token_invalid(client, create_drb_user, rsa_keys): | ||
"""OIDC invalid token for the 'drb' user""" | ||
return generate_token( | ||
username="drb", | ||
private_key=rsa_keys["private_key"], | ||
user=create_drb_user, | ||
valid=False, | ||
) | ||
|
||
|
||
@pytest.fixture() | ||
def get_token_func(pbench_admin_token): | ||
def get_token_func(pbench_admin_token, rsa_keys): | ||
"""Get the token function for fetching the token for a user | ||
This fixture yields a function value which can be called to get the internal | ||
|
@@ -789,17 +859,21 @@ def get_token_func(pbench_admin_token): | |
generated. | ||
""" | ||
return lambda user: ( | ||
pbench_admin_token if user == admin_username else generate_token(username=user) | ||
pbench_admin_token | ||
if user == admin_username | ||
else generate_token(username=user, private_key=rsa_keys["private_key"]) | ||
) | ||
|
||
|
||
def generate_token( | ||
username: str, | ||
private_key: str, | ||
user: Optional[User] = None, | ||
pbench_client_roles: Optional[list[str]] = None, | ||
valid: bool = True, | ||
) -> str: | ||
"""Generates an internal JWT token that mimics a real internal token | ||
obtained from an internal user login. | ||
"""Generates an OIDC JWT token that mimics a real OIDC token | ||
obtained from the user login. | ||
Args: | ||
username : username to include in the token payload | ||
|
@@ -823,10 +897,35 @@ def generate_token( | |
"iat": current_utc, | ||
"exp": exp, | ||
"sub": user.id, | ||
"aud": oidc_public_client, | ||
"azp": oidc_public_client, | ||
"realm_access": { | ||
"roles": [ | ||
"default-roles-pbench-server", | ||
"offline_access", | ||
"uma_authorization", | ||
] | ||
}, | ||
"resource_access": { | ||
"broker": {"roles": ["read-token"]}, | ||
"account": { | ||
"roles": ["manage-account", "manage-account-links", "view-profile"] | ||
}, | ||
}, | ||
"scope": "openid profile email", | ||
"sid": "1988612e-774d-43b8-8d4a-bbc05ee55edb", | ||
"email_verified": True, | ||
"name": user.first_name + " " + user.last_name, | ||
"preferred_username": username, | ||
"given_name": user.first_name, | ||
"family_name": user.last_name, | ||
"email": user.email, | ||
} | ||
token_str = jwt.encode(payload, jwt_secret, algorithm="HS256") | ||
token = ActiveTokens(auth_token=token_str) | ||
user.update(auth_tokens=token) | ||
if pbench_client_roles: | ||
payload["resource_access"].update( | ||
{f"{oidc_public_client}": {"roles": pbench_client_roles}} | ||
) | ||
token_str = jwt.encode(payload, private_key, algorithm="RS256") | ||
return token_str | ||
|
||
|
||
|
Oops, something went wrong.