Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge DeliveryCfg #1094

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions delivery/jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dacite
import jwt

import model.delivery

logger = logging.getLogger(__name__)
JWT_KEY = 'bearer_token'
Expand Down Expand Up @@ -66,28 +65,6 @@ def from_dict(data: dict) -> typing.Self:
),
)

@staticmethod
def from_signing_cfg(signing_cfg: model.delivery.SigningCfg) -> typing.Self:
algorithm = Algorithm(signing_cfg.algorithm().upper())
use = Use.SIGNATURE
kid = signing_cfg.id()

if algorithm == Algorithm.RS256:
public_key = Crypto.PublicKey.RSA.import_key(signing_cfg.public_key())

return RSAPublicKey(
use=use,
kid=kid,
n=encodeBase64urlUInt(public_key.n),
e=encodeBase64urlUInt(public_key.e),
)
elif algorithm == Algorithm.HS256:
return SymmetricKey(
use=use,
kid=kid,
k=encodeBase64url(signing_cfg.secret().encode('utf-8')),
)


@dataclasses.dataclass(frozen=True, kw_only=True)
class RSAPublicKey(JSONWebKey):
Expand Down
128 changes: 2 additions & 126 deletions model/delivery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,134 +3,10 @@
# SPDX-License-Identifier: Apache-2.0


import collections.abc
import enum
import model.base

from model.base import (
ModelBase,
NamedModelElement,
)


class DeliveryConfig(NamedModelElement):
'''
Not intended to be instantiated by users of this module
'''
def auth(self):
return Auth(self.raw.get('auth'))

def service(self):
return DeliverySvcCfg(self.raw.get('service'))

def dashboard(self):
return DeliveryDashboardCfg(self.raw.get('dashboard'))

def db_cfg_name(self):
return self.raw.get('db_cfg_name')

def _optional_attributes(self):
yield from super()._optional_attributes()
yield from [
'db_cfg_name',
]

def _required_attributes(self):
yield from super()._required_attributes()
yield from [
'service',
]


class OAuthType(enum.Enum):
GITHUB = 'github'


class Auth(ModelBase):
def oauth_cfgs(self):
return [OAuth(raw) for raw in self.raw.get('oauth_cfgs')]

def oauth_cfg(self, github_cfg):
for oc in self.oauth_cfgs():
if oc.github_cfg() == github_cfg:
return oc
raise KeyError(f'no oauth cfg for {github_cfg}')


class OAuth(ModelBase):
def name(self) -> str:
return self.raw.get('name')

def type(self):
return OAuthType(self.raw.get('type'))

def github_cfg(self):
return self.raw.get('github_cfg')

def oauth_url(self):
return self.raw.get('oauth_url')

def token_url(self):
return self.raw.get('token_url')

def client_id(self):
return self.raw.get('client_id')

def client_secret(self):
return self.raw.get('client_secret')

def scope(self):
return self.raw.get('scope')

def role_bindings(self):
return self.raw.get('role_bindings', [])


class SigningCfg(ModelBase):
def id(self) -> str:
return self.raw.get('id')

def algorithm(self):
return self.raw.get('algorithm', 'HS256')

def secret(self):
return self.raw.get('secret')

def public_key(self):
return self.raw.get('public_key')

def purpose_labels(self) -> list[str]:
return self.raw.get('purpose_labels', [])


class DeliveryDashboardCfg(ModelBase):
def deployment_name(self):
return self.raw.get('deployment_name', 'delivery-dashboard')


class DeliverySvcCfg(ModelBase):
def deployment_name(self):
return self.raw.get('deployment_name', 'delivery-service')

def signing_cfgs(
self,
purpose_label: str = None,
) -> collections.abc.Generator[SigningCfg, None, None]:
cfgs = self.raw.get('signing')

if not purpose_label:
yield from [
SigningCfg(raw) for raw in cfgs
]
return

for raw_signing_cfg in cfgs:
signing_cfg = SigningCfg(raw_signing_cfg)

if purpose_label in signing_cfg.purpose_labels():
yield signing_cfg


class DeliveryEndpointsCfg(NamedModelElement):
class DeliveryEndpointsCfg(model.base.NamedModelElement):
def service_host(self):
return self.raw['service_host']

Expand Down
38 changes: 18 additions & 20 deletions test/delivery/jwt_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@
import jwt

import delivery.jwt
import model.delivery


ISSUER = 'test-issuer'
private_key = Crypto.PublicKey.RSA.generate(4096)
public_key = private_key.public_key()


@pytest.fixture
def signing_cfg() -> model.delivery.SigningCfg:
private_key = Crypto.PublicKey.RSA.generate(4096)

return model.delivery.SigningCfg({
'id': '0',
'algorithm': delivery.jwt.Algorithm.RS256,
'secret': private_key.export_key(format='PEM'),
'public_key': private_key.public_key().export_key(format='PEM'),
'purpose_labels': [],
})
def json_web_key() -> delivery.jwt.JSONWebKey:
return delivery.jwt.JSONWebKey.from_dict(
data={
'use': delivery.jwt.Use.SIGNATURE,
'kid': 'foo',
'alg': delivery.jwt.Algorithm.RS256,
'n': delivery.jwt.encodeBase64urlUInt(public_key.n),
'e': delivery.jwt.encodeBase64urlUInt(private_key.e)
}
)


@pytest.fixture
def token(signing_cfg) -> str:
def token() -> str:
now = datetime.datetime.now(tz=datetime.timezone.utc)
time_delta = datetime.timedelta(days=730) # 2 years

Expand All @@ -39,21 +41,17 @@ def token(signing_cfg) -> str:
'email_address': '[email protected]',
},
'exp': int((now + time_delta).timestamp()),
'key_id': signing_cfg.id(),
'key_id': '0',
}

return jwt.encode(
payload=token,
key=signing_cfg.secret(),
algorithm=signing_cfg.algorithm(),
key=private_key.export_key(format='PEM'),
algorithm=delivery.jwt.Algorithm.RS256,
)


def test_jwt(signing_cfg, token):
json_web_key = delivery.jwt.JSONWebKey.from_signing_cfg(
signing_cfg=signing_cfg,
)

def test_jwt(json_web_key, token):
# token was just created and thus is not expired yet
assert not delivery.jwt.is_jwt_token_expired(
token=token,
Expand Down
Loading