Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
akx committed Oct 30, 2019
0 parents commit bce6750
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
*.egg-info
*.py[cod]
.cache
.coverage
.eggs
.idea
.tox
.venv
build/
dist/
htmlcov/
22 changes: 22 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

The MIT License (MIT)

Copyright (c) 2019 Valohai

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
oidckit
=======

Unobtrusive pluggable OpenID Connect consumer toolkit

Usage
-----

Construct a configuration class and a client class.
This example is for [Microsoft Identity Platform / Azure AD 2.0](https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-v2-protocols) OIDC flows, based on [the published OIDC configuration](https://login.microsoftonline.com/common/v2.0/.well-known/openid-configuration).
For other providers, you may wish to also override some of the `Provider` functions.

```python
class AzureADOIDCConfig(OIDCProviderConfiguration):
op_authorization_endpoint = 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize'
op_jwks_endpoint = 'https://login.microsoftonline.com/common/discovery/v2.0/keys'
op_token_endpoint = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
op_user_endpoint = 'https://graph.microsoft.com/oidc/userinfo'
rp_scopes = 'openid email profile'
rp_sign_algo = 'RS256'
rp_client_id = 'your application ID here'
rp_client_secret = 'your application secret here'


class AzureADOIDCProvider(OIDCProvider):
config = AzureADOIDCConfig()
```

In your web framework of choice, implement views/URL endpoints to initiate the OIDC dance and to receive the access code.
This example is for Django, using old-school function-based views.

First, the initiation view. Note you'll need to store the `auth_state` within the authentication request
returned somewhere you can retrieve it when the client is redirected back to the callback view.

```python
def initiate_login(request):
with AzureADOIDCProvider() as provider:
auth_req = build_authentication_request(
provider=provider,
request=request,
redirect_uri='http://absolute-url-to-your-callback-view/',
)
request.session['auth_state'] = auth_req.auth_state.asdict()
return HttpResponseRedirect(auth_req.redirect_url)
```

Then, the callback view, using the same provider class, and the state we just stashed. You'll need to pass the
`code` and `state` querystring parameters the OIDC provider passes to the library, too.

```python
def authentication_callback(request):
with AzureADOIDCProvider() as provider:
auth_resp = process_callback_data(
auth_state=request.session.pop('auth_state'),
code=request.GET.get('code'),
provider=provider,
state=request.GET.get('state'),
)
```

Congratulations! If everything went fine, `auth_resp` will contain token data from the IDP.
You can use this – according to the instructions of the IDP, of course – to sign up an user, log them in, etc.

For instance, for MSIP, [`auth_resp.decode_id_token()['sub']` will contain an identifier](https://docs.microsoft.com/en-gb/azure/active-directory/develop/id-tokens)
uniquely identifying the user (for your application!).

Acknowledgements
----------------

* This project was born as a way to make [`mozilla-django-oidc` project](https://github.com/mozilla/mozilla-django-oidc/)
less opinionated and less Django-reliant.
1 change: 1 addition & 0 deletions oidckit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1"
53 changes: 53 additions & 0 deletions oidckit/crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import secrets
import string

from josepy import JWK, JWS, Header

from oidckit.excs import OIDCError


def decode_jws(
payload: bytes, key: dict, expected_algorithm: str, verify: bool = True
) -> dict:
jws = JWS.from_compact(payload)
if verify:
try:
alg = jws.signature.combined.alg.name
except KeyError as exc:
raise OIDCError("No alg value found in header") from exc

if alg != expected_algorithm:
raise OIDCError(
f"Algorithm mismatch: offered {alg} is not expected {expected_algorithm}"
)

jwk = JWK.from_json(key)
if not jws.verify(jwk):
raise OIDCError("JWS token verification failed.")

return jws.payload


def get_key_from_keyset_json(keyset_json: dict, token: bytes) -> dict:
jws = JWS.from_compact(token)
header = Header.json_loads(jws.signature.protected)
expected_kid = str(header.kid)
expected_alg = str(header.alg)

for jwk in keyset_json["keys"]:
if jwk["kid"] != expected_kid:
continue
jwk_alg = jwk.get("alg")
if jwk_alg and jwk_alg != expected_alg:
raise OIDCError(
f"kid {header.kid} has alg {jwk_alg}, was expecting {header.alg}"
)
return jwk
raise OIDCError(f"Keyset has no matching key for kid {expected_kid}.")


def get_random_string(
length=12,
keyspace=(string.ascii_lowercase + string.ascii_uppercase + string.digits),
):
return "".join(secrets.choice(keyspace) for i in range(length))
20 changes: 20 additions & 0 deletions oidckit/excs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import requests


class OIDCError(Exception):
pass


class RemoteError(requests.RequestException, OIDCError):
"""
Raised when a remote call fails.
"""

@classmethod
def raise_from_status(cls, response: requests.Response):
try:
response.raise_for_status()
except requests.HTTPError as he:
raise cls(
he.response.text, request=he.request, response=he.response
) from he
78 changes: 78 additions & 0 deletions oidckit/objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from typing import Optional, TYPE_CHECKING

if TYPE_CHECKING:
from oidckit.provider import OIDCProvider


class AuthenticationState:
state: str
redirect_uri: str
nonce: Optional[str] = None

def __init__(self, state, redirect_uri, nonce=None):
self.state = state
self.redirect_uri = redirect_uri
self.nonce = nonce

def asdict(self):
return vars(self)


class AuthenticationResult:
provider: "OIDCProvider"
auth_state: AuthenticationState
token: dict
_user_info: Optional[dict] = None
_decoded_id_token: Optional[dict] = None
_decoded_access_token: Optional[dict] = None

def __init__(self, provider, auth_state, token):
self.provider = provider
self.auth_state = auth_state
self.token = token

@property
def id_token(self):
return self.token["id_token"]

@property
def access_token(self):
return self.token["access_token"]

def get_user_info(self):
if not self._user_info:
self._user_info = self.provider.retrieve_user_info(auth_result=self)
return self._user_info

def decode_id_token(self) -> dict:
if not self._decoded_id_token:
self._decoded_id_token = self.provider.decode_token(
self.id_token, nonce=self.auth_state.nonce
)
return self._decoded_id_token

def decode_access_token(self, verify: bool = True) -> dict:
"""
Try to decode the access token, if any, of the token payload.
This will only succeed if the token actually _is_ a JWT token.
The token data may occasionally be signed in a way that defies
signature verification by mortal means. You can pass `verify=False`
to bypass this verification. However, `id_token` must always be
verifiable, which is why the `decode_id_token()` function does not
let you shoot yourself in the foot by even allowing `verify=False`.
"""
if not self._decoded_access_token:
self._decoded_access_token = self.provider.decode_token(
self.access_token, nonce=self.auth_state.nonce, verify=verify
)
return self._decoded_access_token


class AuthenticationRequest:
redirect_url: str
auth_state: AuthenticationState

def __init__(self, *, redirect_url, auth_state):
self.redirect_url = redirect_url
self.auth_state = auth_state
112 changes: 112 additions & 0 deletions oidckit/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
from typing import Optional

import requests

from oidckit.excs import OIDCError, RemoteError
from oidckit.crypto import decode_jws, get_key_from_keyset_json
from oidckit.objects import AuthenticationState, AuthenticationResult


class OIDCProviderConfiguration:
op_authorization_endpoint: str
op_jwks_endpoint: str
op_token_endpoint: str
op_user_endpoint: str
rp_client_id: str
rp_client_secret: str
rp_scopes: str = "openid email"
rp_sign_algo: str = "HS256"


class OIDCProvider:
config: OIDCProviderConfiguration
nonce_size: int = 32
state_size: int = 32

_session = None
_jwks_data = None

@property
def session(self):
if not self._session:
self._session = requests.Session()
return self._session

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()

def build_authentication_request_params(
self, *, redirect_uri: str, state: str, request=None
) -> dict:
return {
"client_id": self.config.rp_client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": self.config.rp_scopes,
"state": state,
}

def build_token_request_payload(
self, *, code: str, auth_state: AuthenticationState
):
return {
"client_id": self.config.rp_client_id,
"client_secret": self.config.rp_client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": auth_state.redirect_uri,
}

def retrieve_token(self, payload: dict) -> dict:
response = self.session.post(self.config.op_token_endpoint, data=payload)
RemoteError.raise_from_status(response)
return response.json()

def retrieve_token_key(self, token) -> dict:
if self.config.rp_sign_algo.startswith("RS"):
if self.config.op_jwks_endpoint:
if not self._jwks_data:
response = self.session.get(self.config.op_jwks_endpoint)
RemoteError.raise_from_status(response)
self._jwks_data = response.json()
return get_key_from_keyset_json(
keyset_json=self._jwks_data, token=token
)
raise NotImplementedError("No idea how to get token key – subclass, please")

def get_payload_data(self, token: bytes, key: dict, verify: bool = True):
return decode_jws(
payload=token,
key=key,
expected_algorithm=self.config.rp_sign_algo,
verify=verify,
)

def decode_token(
self, token: str, nonce: Optional[str] = None, verify: bool = True
) -> dict:
token = str(token).encode("utf-8")
key = self.retrieve_token_key(token)
payload_data = self.get_payload_data(token, key, verify=verify)
payload = json.loads(payload_data)
if nonce and verify:
token_nonce = payload.get("nonce")
if nonce != token_nonce:
raise OIDCError(
f"Token nonce mismatch – expected {nonce}, got {token_nonce}"
)
return payload

def retrieve_user_info(self, auth_result: AuthenticationResult) -> Optional[dict]:
if not self.config.op_user_endpoint:
return None
user_response = self.session.get(
self.config.op_user_endpoint,
headers={"Authorization": f"Bearer {auth_result.access_token}"},
)
user_response.raise_for_status()
return user_response.json()
Loading

0 comments on commit bce6750

Please sign in to comment.