From 0020c5bd8896ac58f308d7c4591a03c7fd34c7c4 Mon Sep 17 00:00:00 2001 From: "Philipp Heil (zkdev)" Date: Mon, 9 Dec 2024 10:11:16 +0100 Subject: [PATCH] Purge `DeliveryCfg` Was mved to delivery-service in order to (further) decouple ocm-gear from cc-utils. --- delivery/jwt.py | 23 ------- model/delivery.py | 128 +------------------------------------- test/delivery/jwt_test.py | 38 ++++++----- 3 files changed, 20 insertions(+), 169 deletions(-) diff --git a/delivery/jwt.py b/delivery/jwt.py index 4041a5b4b..9d767d983 100644 --- a/delivery/jwt.py +++ b/delivery/jwt.py @@ -16,7 +16,6 @@ import dacite import jwt -import model.delivery logger = logging.getLogger(__name__) JWT_KEY = 'bearer_token' @@ -66,28 +65,6 @@ def from_dict(data: dict) -> typing.Self: ), ) - @staticmethod - def from_signing_cfg(signing_cfg: model.delivery.SigningCfg) -> typing.Self: - algorithm = Algorithm(signing_cfg.algorithm().upper()) - use = Use.SIGNATURE - kid = signing_cfg.id() - - if algorithm == Algorithm.RS256: - public_key = Crypto.PublicKey.RSA.import_key(signing_cfg.public_key()) - - return RSAPublicKey( - use=use, - kid=kid, - n=encodeBase64urlUInt(public_key.n), - e=encodeBase64urlUInt(public_key.e), - ) - elif algorithm == Algorithm.HS256: - return SymmetricKey( - use=use, - kid=kid, - k=encodeBase64url(signing_cfg.secret().encode('utf-8')), - ) - @dataclasses.dataclass(frozen=True, kw_only=True) class RSAPublicKey(JSONWebKey): diff --git a/model/delivery.py b/model/delivery.py index 3e6441a31..87b958430 100644 --- a/model/delivery.py +++ b/model/delivery.py @@ -3,134 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 -import collections.abc -import enum +import model.base -from model.base import ( - ModelBase, - NamedModelElement, -) - -class DeliveryConfig(NamedModelElement): - ''' - Not intended to be instantiated by users of this module - ''' - def auth(self): - return Auth(self.raw.get('auth')) - - def service(self): - return DeliverySvcCfg(self.raw.get('service')) - - def dashboard(self): - return DeliveryDashboardCfg(self.raw.get('dashboard')) - - def db_cfg_name(self): - return self.raw.get('db_cfg_name') - - def _optional_attributes(self): - yield from super()._optional_attributes() - yield from [ - 'db_cfg_name', - ] - - def _required_attributes(self): - yield from super()._required_attributes() - yield from [ - 'service', - ] - - -class OAuthType(enum.Enum): - GITHUB = 'github' - - -class Auth(ModelBase): - def oauth_cfgs(self): - return [OAuth(raw) for raw in self.raw.get('oauth_cfgs')] - - def oauth_cfg(self, github_cfg): - for oc in self.oauth_cfgs(): - if oc.github_cfg() == github_cfg: - return oc - raise KeyError(f'no oauth cfg for {github_cfg}') - - -class OAuth(ModelBase): - def name(self) -> str: - return self.raw.get('name') - - def type(self): - return OAuthType(self.raw.get('type')) - - def github_cfg(self): - return self.raw.get('github_cfg') - - def oauth_url(self): - return self.raw.get('oauth_url') - - def token_url(self): - return self.raw.get('token_url') - - def client_id(self): - return self.raw.get('client_id') - - def client_secret(self): - return self.raw.get('client_secret') - - def scope(self): - return self.raw.get('scope') - - def role_bindings(self): - return self.raw.get('role_bindings', []) - - -class SigningCfg(ModelBase): - def id(self) -> str: - return self.raw.get('id') - - def algorithm(self): - return self.raw.get('algorithm', 'HS256') - - def secret(self): - return self.raw.get('secret') - - def public_key(self): - return self.raw.get('public_key') - - def purpose_labels(self) -> list[str]: - return self.raw.get('purpose_labels', []) - - -class DeliveryDashboardCfg(ModelBase): - def deployment_name(self): - return self.raw.get('deployment_name', 'delivery-dashboard') - - -class DeliverySvcCfg(ModelBase): - def deployment_name(self): - return self.raw.get('deployment_name', 'delivery-service') - - def signing_cfgs( - self, - purpose_label: str = None, - ) -> collections.abc.Generator[SigningCfg, None, None]: - cfgs = self.raw.get('signing') - - if not purpose_label: - yield from [ - SigningCfg(raw) for raw in cfgs - ] - return - - for raw_signing_cfg in cfgs: - signing_cfg = SigningCfg(raw_signing_cfg) - - if purpose_label in signing_cfg.purpose_labels(): - yield signing_cfg - - -class DeliveryEndpointsCfg(NamedModelElement): +class DeliveryEndpointsCfg(model.base.NamedModelElement): def service_host(self): return self.raw['service_host'] diff --git a/test/delivery/jwt_test.py b/test/delivery/jwt_test.py index b5227d3d5..d3f7643a7 100644 --- a/test/delivery/jwt_test.py +++ b/test/delivery/jwt_test.py @@ -5,26 +5,28 @@ import jwt import delivery.jwt -import model.delivery + ISSUER = 'test-issuer' +private_key = Crypto.PublicKey.RSA.generate(4096) +public_key = private_key.public_key() @pytest.fixture -def signing_cfg() -> model.delivery.SigningCfg: - private_key = Crypto.PublicKey.RSA.generate(4096) - - return model.delivery.SigningCfg({ - 'id': '0', - 'algorithm': delivery.jwt.Algorithm.RS256, - 'secret': private_key.export_key(format='PEM'), - 'public_key': private_key.public_key().export_key(format='PEM'), - 'purpose_labels': [], - }) +def json_web_key() -> delivery.jwt.JSONWebKey: + return delivery.jwt.JSONWebKey.from_dict( + data={ + 'use': delivery.jwt.Use.SIGNATURE, + 'kid': 'foo', + 'alg': delivery.jwt.Algorithm.RS256, + 'n': delivery.jwt.encodeBase64urlUInt(public_key.n), + 'e': delivery.jwt.encodeBase64urlUInt(private_key.e) + } + ) @pytest.fixture -def token(signing_cfg) -> str: +def token() -> str: now = datetime.datetime.now(tz=datetime.timezone.utc) time_delta = datetime.timedelta(days=730) # 2 years @@ -39,21 +41,17 @@ def token(signing_cfg) -> str: 'email_address': 'test-user@email.com', }, 'exp': int((now + time_delta).timestamp()), - 'key_id': signing_cfg.id(), + 'key_id': '0', } return jwt.encode( payload=token, - key=signing_cfg.secret(), - algorithm=signing_cfg.algorithm(), + key=private_key.export_key(format='PEM'), + algorithm=delivery.jwt.Algorithm.RS256, ) -def test_jwt(signing_cfg, token): - json_web_key = delivery.jwt.JSONWebKey.from_signing_cfg( - signing_cfg=signing_cfg, - ) - +def test_jwt(json_web_key, token): # token was just created and thus is not expired yet assert not delivery.jwt.is_jwt_token_expired( token=token,