Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
BrandonSharratt committed Jul 19, 2024
1 parent 4212768 commit a602e40
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 134 deletions.
1 change: 1 addition & 0 deletions btr-api/src/btr_api/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

"""This exports all of the enums used by the application."""
from .log_level import LogLevel
from .redaction_types import RedactionType
29 changes: 29 additions & 0 deletions btr-api/src/btr_api/enums/redaction_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright © 2024 Province of British Columbia
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Enum for redaction types."""
from btr_api.common.enum import BaseEnum
from btr_api.services import btr_auth


class RedactionType(BaseEnum):
"""Enum for the redaction types"""

REDACT_MONONYM = 'mono'
REDACT_MONONYM_FN = 'mono_fn'
REDACT_EMAIL = 'mono_email'
REDACT_PHONE = 'mono_phone'
REDACT_FULL = 'full'
REDACT_EMPTY = 'empty'
REDACT_DATE = 'date'
REDACT_IDENTIFIER = 'identifier'
110 changes: 4 additions & 106 deletions btr-api/src/btr_api/resources/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from btr_api.services import SchemaService
from btr_api.services import SubmissionService
from btr_api.services.validator import validate_entity
from btr_api.utils import redactInformation
from flask import Blueprint
from flask import current_app
from flask import g
Expand All @@ -64,109 +65,6 @@

bp = Blueprint('submission', __name__)

REDACT_MONONYM = 'mono'
REDACT_MONONYM_FN = 'mono_fn'
REDACT_EMAIL = 'mono_email'
REDACT_PHONE = 'mono_phone'
REDACT_FULL = 'full'
REDACT_EMPTY = 'empty'
REDACT_DATE = 'date'
REDACT_IDENTIFIER = 'identifier'

# the lack of a rule means it shows fully
REDACT_RULES = {
btr_auth.USER_PUBLIC: {
'prefName': REDACT_EMPTY,
'email': REDACT_EMPTY,
'phone': REDACT_EMPTY,
'postal': REDACT_EMPTY,
'street': REDACT_EMPTY,
'streetAdditional': REDACT_EMPTY,
'locationDescription': REDACT_EMPTY,
'birthDate': REDACT_EMPTY,
'identifiers': REDACT_EMPTY,
},
btr_auth.USER_STAFF: {
'email': REDACT_EMAIL,
'phone': REDACT_PHONE,
'postal': REDACT_EMPTY,
'street': REDACT_EMPTY,
'streetAdditional': REDACT_EMPTY,
'locationDescription': REDACT_EMPTY,
'birthDate': REDACT_DATE,
'identifiers': REDACT_IDENTIFIER,
},
}


def redactInformation(request, payload):
role = btr_auth.getUserType(request)
if role == btr_auth.USER_COMPETENT_AUTHORITY:
return payload
# otherwise PUBLIC or STAFF
redactionToUse = REDACT_RULES[role]
for person in payload['payload']['personStatements']:
for name in person['names']:
if name['type'] == 'alternative':
name['fullName'] = redactField(name['fullName'], redactionToUse.get('prefName'))

person['email'] = redactField(person['email'], redactionToUse.get('email'))
person['phoneNumber']['number'] = redactField(person['phoneNumber']['number'], redactionToUse.get('phone'))
for address in person['addresses']:
address['postalCode'] = redactField(address['postalCode'], redactionToUse.get('postal'))
address['street'] = redactField(address['street'], redactionToUse.get('street'))
address['streetAdditional'] = redactField(
address['streetAdditional'], redactionToUse.get('streetAdditional')
)
address['locationDescription'] = redactField(
address['locationDescription'], redactionToUse.get('locationDescription')
)

person['placeOfResidence']['postalCode'] = redactField(
person['placeOfResidence']['postalCode'], redactionToUse.get('postal')
)
person['placeOfResidence']['street'] = redactField(
person['placeOfResidence']['street'], redactionToUse.get('street')
)
person['placeOfResidence']['streetAdditional'] = redactField(
person['placeOfResidence']['streetAdditional'], redactionToUse.get('streetAdditional')
)
person['placeOfResidence']['locationDescription'] = redactField(
person['placeOfResidence']['locationDescription'], redactionToUse.get('locationDescription')
)
person['birthDate'] = redactField(person['birthDate'], redactionToUse.get('birthDate'))
for identifier in person['identifiers']:
identifier['id'] = redactField(identifier['id'], redactionToUse.get('identifiers'))
return jsonify(payload)


def redactField(field, redactType):
if redactType == REDACT_MONONYM:
return field[0:1] + '***'
elif redactType == REDACT_MONONYM_FN:
rv = ''
for word in field.split():
if not (rv == ''):
rv += ' '
rv += word[0:1] + '***'
return rv
elif redactType == REDACT_EMAIL:
return field.split('@')[0][0:1] + '***' + '@***.' + field.split('.')[-1]
elif redactType == REDACT_PHONE:
return field[0:-7] + '***'
elif redactType == REDACT_FULL:
return '***'
# This is a space instead of blank because if it's empty the ui shows undefined in some spots
elif redactType == REDACT_EMPTY:
return ' '
elif redactType == REDACT_DATE:
return field.split('-')[0]
elif redactType == REDACT_IDENTIFIER:
return '*** **' + field[6:]

return field


@bp.route('/<id>', methods=('GET',))
def registers(id: int | None = None): # pylint: disable=redefined-builtin
"""Get the submissions, or specific submission by id."""
Expand All @@ -189,12 +87,12 @@ def get_entity_submission(business_identifier: str):

try:
btr_auth.is_authorized(request=request, business_identifier=business_identifier)
account_id = request.args.get('account_id')
account_id = request.headers.get('Account-Id', None)
btr_auth.product_authorizations(request=request, account_id=account_id)

submission = SubmissionModel.find_by_business_identifier(business_identifier)
if submission:
return redactInformation(request, SubmissionSerializer.to_dict(submission))
return jsonify(redactInformation(request, SubmissionSerializer.to_dict(submission)))

return {}, HTTPStatus.NOT_FOUND

Expand Down Expand Up @@ -258,7 +156,7 @@ def create_register():
# update record in BOR (search)
token = btr_auth.get_bearer_token()
entity_addresses: dict[str, dict[str, dict]] = btr_entity.get_entity_info(
jwt, f"{identifier}/addresses"
jwt, f'{identifier}/addresses'
).json()
entity['business']['addresses'] = [entity_addresses.get('registeredOffice', {}).get('deliveryAddress')]
btr_bor.update_owners(submission, entity, token)
Expand Down
35 changes: 13 additions & 22 deletions btr-api/src/btr_api/services/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from flask import Flask
from requests import exceptions

from btr_api.common.auth import jwt


class AuthService:
"""Provides utility functions for connecting with the BC Registries auth-api and SSO service."""
Expand Down Expand Up @@ -75,27 +77,21 @@ def init_app(self, app: Flask):
"""
Get the type of user based on their products, roles, and organization membership.
Args:
user: User object containing product_resp and auth_resp attributes.
Returns:
str: The type of the user (USER_COMPETENT_AUTHORITY, USER_STAFF, or USER_PUBLIC).
"""

def getUserType(self, user):
def getUserType(self):
try:
products = user.product_resp.json()
self.app.logger.info(products)
roles = user.auth_resp.json()['roles']
products = jwt.products.json()
if any(
product
for product in products
if product['code'].upper() == self.CA_PRODUCT and product['subscriptionStatus'].upper() == 'ACTIVE'
):
self.app.logger.info('Get User Type: ' + self.USER_COMPETENT_AUTHORITY)
return self.USER_COMPETENT_AUTHORITY
if any(role for role in roles if role == self.STAFF_ROLE):
self.app.logger.info('Get User Type: ' + self.USER_STAFF)
if jwt.validate_roles([self.STAFF_ROLE]):
return self.USER_STAFF
except Exception as e:
self.app.logger.error('Error in Get User Type: ' + str(e))
Expand Down Expand Up @@ -134,7 +130,7 @@ def get_authorization_header(self, request) -> str:
"""Gets authorization header from request."""
authorization_header = request.headers.get('Authorization', None)
if not authorization_header:
error = f"Missing authorization header: {request.headers}"
error = f'Missing authorization header: {request.headers}'
self.app.logger.debug('Cannot find authorization_header in request.')
raise AuthException(error=error, status_code=HTTPStatus.UNAUTHORIZED)

Expand All @@ -146,30 +142,26 @@ def product_authorizations(self, request, account_id: str) -> bool:
if not account_id:
error = 'Missing account_id'
self.app.logger.debug(error)
request.product_resp = {}
return True
try:
auth_token = self.get_authorization_header(request)
# make api call
headers = {'Authorization': auth_token}
auth_url = f"{self.svc_url}/orgs/{account_id}/products?include_hidden=true"
auth_url = f'{self.svc_url}/orgs/{account_id}/products?include_hidden=true'
self.app.logger.debug(auth_url)
resp = requests.get(url=auth_url, headers=headers, timeout=self.timeout)

if resp.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
error = f"{resp.status_code} - {str(resp.json())}"
error = f'{resp.status_code} - {str(resp.json())}'
self.app.logger.debug('Invalid response from auth-api: %s', error)
request.product_resp = {}

if resp.status_code != HTTPStatus.OK:
error = f"Unauthorized access to products {account_id}"
error = f'Unauthorized access to products {account_id}'
self.app.logger.debug(error)
request.product_resp = {}

request.product_resp = resp
jwt.products = resp
return True
except Exception as err:
request.product_resp = {}
self.app.logger.debug(err)
return True

Expand All @@ -180,21 +172,20 @@ def is_authorized(self, request, business_identifier: str) -> bool:
auth_token = self.get_authorization_header(request)
# make api call
headers = {'Authorization': auth_token}
auth_url = f"{self.svc_url}/entities/{business_identifier}/authorizations"
auth_url = f'{self.svc_url}/entities/{business_identifier}/authorizations'

resp = requests.get(url=auth_url, headers=headers, timeout=self.timeout)

if resp.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR:
error = f"{resp.status_code} - {str(resp.json())}"
error = f'{resp.status_code} - {str(resp.json())}'
self.app.logger.debug('Invalid response from auth-api: %s', error)
raise ExternalServiceException(error=error, status_code=HTTPStatus.SERVICE_UNAVAILABLE)

if resp.status_code != HTTPStatus.OK or 'edit' not in resp.json().get('roles', []):
error = f"Unauthorized access to business: {business_identifier}"
error = f'Unauthorized access to business: {business_identifier}'
self.app.logger.debug(error)
raise AuthException(error=error, status_code=HTTPStatus.FORBIDDEN)

request.auth_resp = resp
return True
except AuthException as e:
# pass along
Expand Down
1 change: 1 addition & 0 deletions btr-api/src/btr_api/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .redact import redactInformation, redactField
98 changes: 98 additions & 0 deletions btr-api/src/btr_api/utils/redact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from flask import current_app

from btr_api.services import btr_auth
from btr_api.enums import RedactionType

# the lack of a rule means it shows fully
REDACT_RULES = {
btr_auth.USER_PUBLIC: {
'prefName': RedactionType.REDACT_EMPTY,
'email': RedactionType.REDACT_EMPTY,
'phone': RedactionType.REDACT_EMPTY,
'postal': RedactionType.REDACT_EMPTY,
'street': RedactionType.REDACT_EMPTY,
'streetAdditional': RedactionType.REDACT_EMPTY,
'locationDescription': RedactionType.REDACT_EMPTY,
'birthDate': RedactionType.REDACT_EMPTY,
'identifiers': RedactionType.REDACT_EMPTY,
},
btr_auth.USER_STAFF: {
'email': RedactionType.REDACT_EMAIL,
'phone': RedactionType.REDACT_PHONE,
'postal': RedactionType.REDACT_EMPTY,
'street': RedactionType.REDACT_EMPTY,
'streetAdditional': RedactionType.REDACT_EMPTY,
'locationDescription': RedactionType.REDACT_EMPTY,
'birthDate': RedactionType.REDACT_DATE,
'identifiers': RedactionType.REDACT_IDENTIFIER,
},
}

def redactInformation(request, payload):
role = btr_auth.getUserType()
if role == btr_auth.USER_COMPETENT_AUTHORITY:
return payload
# otherwise PUBLIC or STAFF
current_app.logger.info(RedactionType.REDACT_MONONYM)
redactionToUse = REDACT_RULES[role]
current_app.logger.info(redactionToUse)
for person in payload['payload']['personStatements']:
for name in person['names']:
if name['type'] == 'alternative':
name['fullName'] = redactField(name['fullName'], redactionToUse.get('prefName'))

person['email'] = redactField(person['email'], redactionToUse.get('email'))
person['phoneNumber']['number'] = redactField(person['phoneNumber']['number'], redactionToUse.get('phone'))
for address in person['addresses']:
address['postalCode'] = redactField(address['postalCode'], redactionToUse.get('postal'))
address['street'] = redactField(address['street'], redactionToUse.get('street'))
address['streetAdditional'] = redactField(
address['streetAdditional'], redactionToUse.get('streetAdditional')
)
address['locationDescription'] = redactField(
address['locationDescription'], redactionToUse.get('locationDescription')
)

person['placeOfResidence']['postalCode'] = redactField(
person['placeOfResidence']['postalCode'], redactionToUse.get('postal')
)
person['placeOfResidence']['street'] = redactField(
person['placeOfResidence']['street'], redactionToUse.get('street')
)
person['placeOfResidence']['streetAdditional'] = redactField(
person['placeOfResidence']['streetAdditional'], redactionToUse.get('streetAdditional')
)
person['placeOfResidence']['locationDescription'] = redactField(
person['placeOfResidence']['locationDescription'], redactionToUse.get('locationDescription')
)
person['birthDate'] = redactField(person['birthDate'], redactionToUse.get('birthDate'))
for identifier in person['identifiers']:
identifier['id'] = redactField(identifier['id'], redactionToUse.get('identifiers'))
return payload


def redactField(field, redactType):
if redactType == RedactionType.REDACT_MONONYM:
return field[0:1] + '***'
elif redactType == RedactionType.REDACT_MONONYM_FN:
rv = ''
for word in field.split():
if not (rv == ''):
rv += ' '
rv += word[0:1] + '***'
return rv
elif redactType == RedactionType.REDACT_EMAIL:
return field.split('@')[0][0:1] + '***' + '@***.' + field.split('.')[-1]
elif redactType == RedactionType.REDACT_PHONE:
return field[0:-7] + '***'
elif redactType == RedactionType.REDACT_FULL:
return '***'
# This is a space instead of blank because if it's empty the ui shows undefined in some spots
elif redactType == RedactionType.REDACT_EMPTY:
return ' '
elif redactType == RedactionType.REDACT_DATE:
return field.split('-')[0]
elif redactType == RedactionType.REDACT_IDENTIFIER:
return '*** **' + field[6:]

return field
Loading

0 comments on commit a602e40

Please sign in to comment.