-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
merge + removed debugger and made button for implementing the compone…
…nt's statements dependent on request being set to approve
- Loading branch information
Showing
5 changed files
with
145 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import time | ||
from urllib.parse import urlencode | ||
import json | ||
|
||
from django.conf import settings | ||
from django.core.exceptions import SuspiciousOperation | ||
|
@@ -9,37 +10,129 @@ | |
from mozilla_django_oidc.auth import OIDCAuthenticationBackend, LOGGER | ||
from mozilla_django_oidc.middleware import SessionRefresh | ||
from mozilla_django_oidc.utils import absolutify, add_state_and_nonce_to_session | ||
from base64 import urlsafe_b64encode, urlsafe_b64decode | ||
|
||
from siteapp.models import Portfolio, User | ||
|
||
from siteapp.models import Portfolio | ||
|
||
|
||
class OIDCAuth(OIDCAuthenticationBackend): | ||
|
||
# override get_user method to debug token | ||
def get_userinfo(self, access_token, id_token, payload): | ||
"""Return user details dictionary. The id_token and payload are not used in | ||
the default implementation, but may be used when overriding this method""" | ||
import requests | ||
user_response = requests.get( | ||
self.OIDC_OP_USER_ENDPOINT, | ||
headers={ | ||
'Authorization': 'Bearer {0}'.format(access_token) | ||
}, | ||
verify=self.get_settings('OIDC_VERIFY_SSL', True), | ||
timeout=self.get_settings('OIDC_TIMEOUT', None), | ||
proxies=self.get_settings('OIDC_PROXY', None)) | ||
user_response.raise_for_status() | ||
# LOGGER.warning(f"user info, {type(user_response.text)}, {user_response.text}") | ||
# split on ".": Header.Payload.Signature | ||
header, payload, signature = [self.parse_b64url(content) for content in user_response.text.split(".")] | ||
header = json.loads(header.decode('UTF-8')) | ||
payload = payload[:-1] if b'\x1b' in payload else payload | ||
payload = json.loads(payload.decode('UTF-8)')) | ||
# LOGGER.warning(f"header: {header}, \npayload: {payload}, \nsignature: {signature}") | ||
#return user_response.json() | ||
return payload | ||
|
||
def parse_b64url(self, content): | ||
"""Return decoded base64url content""" | ||
|
||
try: | ||
decoded = urlsafe_b64decode(content+str(b'=======')) | ||
except: | ||
decoded = urlsafe_b64decode(content) | ||
return decoded | ||
|
||
def is_admin(self, groups): | ||
if settings.OIDC_ROLES_MAP["admin"] in groups: | ||
return True | ||
return False | ||
|
||
# override verify_claims to address custom OIDC_RP_SCOPES defined | ||
def verify_claims(self, claims): | ||
"""Verify the provided claims to decide if authentication should be allowed.""" | ||
|
||
# Verify claims required by default configuration | ||
scopes = self.get_settings('OIDC_RP_SCOPES', 'openid email') | ||
if 'email' in scopes.split(): | ||
return 'email' in claims | ||
|
||
LOGGER.warning('Custom OIDC_RP_SCOPES defined. ' | ||
'You need to override `verify_claims` for custom claims verification.') | ||
|
||
# Custom examination of OIDC_RP_SCOPES | ||
# LOGGER.warning(f"\n DEBUG custom OIDC_RP_SCOPES (1):", OIDC_RP_SCOPES) | ||
|
||
return True | ||
|
||
# override get_or_create_user method | ||
def get_or_create_user(self, access_token, id_token, payload): | ||
"""Returns a User instance if 1 user is found. Creates a user if not found | ||
and configured to do so. Returns nothing if multiple users are matched.""" | ||
|
||
user_info = self.get_userinfo(access_token, id_token, payload) | ||
# LOGGER.warning("\n DEBUG user_info (1):", user_info) | ||
|
||
claims_verified = self.verify_claims(user_info) | ||
if not claims_verified: | ||
msg = 'Claims verification failed' | ||
raise SuspiciousOperation(msg) | ||
|
||
# LOGGER.warning("\n DEBUG user_info (2):", user_info) | ||
|
||
# email based filtering | ||
#users = self.filter_users_by_claims(user_info) | ||
users = User.objects.filter(username=user_info.get('preferred_username', None)) | ||
|
||
# LOGGER.warning("\n DEBUG user (3):", users) | ||
|
||
if len(users) == 1: | ||
return self.update_user(users[0], user_info) | ||
elif len(users) > 1: | ||
# In the rare case that two user accounts have the same email address, | ||
# bail. Randomly selecting one seems really wrong. | ||
msg = 'Multiple users returned' | ||
raise SuspiciousOperation(msg) | ||
elif self.get_settings('OIDC_CREATE_USER', True): | ||
user = self.create_user(user_info) | ||
return user | ||
else: | ||
LOGGER.debug('Login failed: No user with %s found, and ' | ||
'OIDC_CREATE_USER is False', | ||
self.describe_user_by_claims(user_info)) | ||
return None | ||
|
||
def create_user(self, claims): | ||
data = {'email': claims[settings.OIDC_CLAIMS_MAP['email']], | ||
'first_name': claims[settings.OIDC_CLAIMS_MAP['first_name']], | ||
'last_name': claims[settings.OIDC_CLAIMS_MAP['last_name']], | ||
'username': claims[settings.OIDC_CLAIMS_MAP['username']], | ||
'is_staff': self.is_admin(claims[settings.OIDC_CLAIMS_MAP['groups']])} | ||
|
||
# TODO: Better handling if no 'username' set. Current approach will cause duplicate record error | ||
# TODO: Is the below sufficiently generic for different customizations for a customer? | ||
data = {'email': claims.get(settings.OIDC_CLAIMS_MAP['email'], "[email protected]"), | ||
'first_name': claims.get(settings.OIDC_CLAIMS_MAP['first_name'], "first_name"), | ||
'last_name': claims.get(settings.OIDC_CLAIMS_MAP['last_name'], "last_name"), | ||
'username': claims.get(settings.OIDC_CLAIMS_MAP['username'], "username01"), | ||
'is_staff': False} | ||
|
||
user = self.UserModel.objects.create_user(**data) | ||
portfolio = Portfolio.objects.create(title=user.email.split('@')[0], description="Personal Portfolio") | ||
portfolio.assign_owner_permissions(user) | ||
if user.default_portfolio is None: | ||
portfolio = user.create_default_portfolio_if_missing() | ||
return user | ||
|
||
def update_user(self, user, claims): | ||
original_values = [getattr(user, x.name) for x in user._meta.get_fields() if hasattr(user, x.name)] | ||
|
||
user.email = claims[settings.OIDC_CLAIMS_MAP['email']] | ||
user.first_name = claims[settings.OIDC_CLAIMS_MAP['first_name']] | ||
user.last_name = claims[settings.OIDC_CLAIMS_MAP['last_name']] | ||
user.username = claims[settings.OIDC_CLAIMS_MAP['username']] | ||
groups = claims[settings.OIDC_CLAIMS_MAP['groups']] | ||
user.email = claims.get(settings.OIDC_CLAIMS_MAP['email'], "[email protected]") | ||
user.first_name = claims.get(settings.OIDC_CLAIMS_MAP['first_name'], "missing first_name") | ||
user.last_name = claims.get(settings.OIDC_CLAIMS_MAP['last_name'], "missing last_name") | ||
user.username = claims.get(settings.OIDC_CLAIMS_MAP['username'], "missing username") | ||
groups = claims.get(settings.OIDC_CLAIMS_MAP['groups'], "missing groups") | ||
user.is_staff = self.is_admin(groups) | ||
user.is_superuser = user.is_staff | ||
|
||
|
@@ -97,14 +190,14 @@ def authenticate(self, request, **kwargs): | |
class OIDCSessionRefresh(SessionRefresh): | ||
def process_request(self, request): | ||
if not self.is_refreshable_url(request): | ||
LOGGER.debug('request is not refreshable') | ||
# LOGGER.debug('request is not refreshable') | ||
return | ||
|
||
expiration = request.session.get('oidc_id_token_expiration', 0) | ||
now = time.time() | ||
if expiration > now: | ||
# The id_token is still valid, so we don't have to do anything. | ||
LOGGER.debug('id token is still valid (%s > %s)', expiration, now) | ||
# LOGGER.debug('id token is still valid (%s > %s)', expiration, now) | ||
return | ||
|
||
LOGGER.debug('id token has expired') | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters