Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test branch #948

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 59 additions & 31 deletions tests/functional/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from jsonpath_rw import parse
from pytest_check import check
import urllib
import uuid
from .utils.apigee_api_apps import ApigeeApiDeveloperApps
from .utils.apigee_api_products import ApigeeApiProducts
from .configuration import config
from .configuration.config import BASE_URL, PDS_BASE_PATH
import random
from requests import Response, get, patch
from requests import Response, get, patch, post
import json
import jwt
from typing import Union
from .data.searches import Search
from .data.updates import Update
Expand All @@ -33,6 +35,10 @@
DeveloperAppsAPI,
)

import logging


LOGGER = logging.getLogger(__name__)
FILE_DIR = os.path.dirname(__file__)
RESPONSES_DIR = os.path.join(FILE_DIR, 'data', 'responses')

Expand Down Expand Up @@ -100,13 +106,16 @@ def provide_p9_auth_details(request) -> None:
def add_scope_to_products_patient_access(products_api: ApiProductsAPI,
nhsd_apim_proxy_name: str,
nhsd_apim_authorization: dict):
LOGGER.info(nhsd_apim_authorization['scope'])
product_name = nhsd_apim_proxy_name.replace("-asid-required", "")

default_product = products_api.get_product_by_name(product_name=product_name)
if nhsd_apim_authorization['scope'] not in default_product['scopes']:
default_product['scopes'].append(nhsd_apim_authorization['scope'])
products_api.put_product_by_name(product_name=product_name, body=default_product)
time.sleep(2)
LOGGER.info(default_product)
LOGGER.info(default_product['scopes'])
# if nhsd_apim_authorization['scope'] not in default_product['scopes']:
# default_product['scopes'].append(nhsd_apim_authorization['scope'])
# products_api.put_product_by_name(product_name=product_name, body=default_product)
# time.sleep(2)


@given("I am a P5 user")
Expand Down Expand Up @@ -226,6 +235,32 @@ def empty_header(headers_with_authorization: dict, field: str) -> dict:
return headers_with_authorization


@given("I have an expired access token", target_fixture='headers_with_authorization')
def add_expired_token_to_auth_header(headers_with_authorization: dict,
encoded_jwt: dict,
identity_service_base_url: str) -> dict:
response = post(
f"{identity_service_base_url}/token",
data={
"_access_token_expiry_ms": "1",
"grant_type": "client_credentials",
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": encoded_jwt,
},
)

assert response.status_code == 200, f'POST /token failed. Response:\n {response.text}'

response_json = response.json()
assert response_json["expires_in"] and int(response_json["expires_in"]) == 0

token_value = response_json['access_token']
headers_with_authorization.update({
'Authorization': f'Bearer {token_value}'
})
return headers_with_authorization


@pytest.fixture
def query_params(search: Search) -> str:
return urllib.parse.urlencode(search.query)
Expand All @@ -243,20 +278,8 @@ def retrieve_related_person(headers_with_authorization: dict, nhs_number: str, p


@when("I update the patient's PDS record", target_fixture='response')
def update_patient(headers_with_authorization: dict, update: Update, pds_url: str) -> Response:
headers = headers_with_authorization
headers.update({
"Content-Type": "application/json-patch+json",
"If-Match": update.etag,
})

return patch(url=f"{pds_url}/Patient/{update.nhs_number}",
headers=headers,
json=update.patches)


@when("I update another patient's PDS record", target_fixture='response')
def update_another_patient(headers_with_authorization: dict, update: Update, pds_url: str) -> Response:
def update_patient(headers_with_authorization: dict, update: Update, pds_url: str) -> Response:
headers = headers_with_authorization
headers.update({
"Content-Type": "application/json-patch+json",
Expand All @@ -281,19 +304,6 @@ def update_patient_incorrect_path(headers_with_authorization: dict, update: Upda
json=update.patches)


@when("I update my own PDS record", target_fixture='response')
def update_patient_self(headers_with_authorization: dict, update_self: Update, pds_url: str) -> Response:
headers = headers_with_authorization
headers.update({
"Content-Type": "application/json-patch+json",
"If-Match": update_self.etag,
})

return patch(url=f"{pds_url}/Patient/{update_self.nhs_number}",
headers=headers,
json=update_self.patches)


@when(
parsers.cfparse(
"I hit the /{endpoint:String} endpoint",
Expand Down Expand Up @@ -612,6 +622,24 @@ def create_random_date():
return new_date


@pytest.fixture()
def encoded_jwt(identity_service_base_url: str):
claims = {
"sub": config.APPLICATION_RESTRICTED_API_KEY,
"iss": config.APPLICATION_RESTRICTED_API_KEY,
"jti": str(uuid.uuid4()),
"aud": f"{identity_service_base_url}/token",
"exp": int(time.time()) + 300,
}

encoded_jwt = jwt.encode(claims,
config.SIGNING_KEY,
algorithm="RS512",
headers={"kid": config.KEY_ID})

return encoded_jwt


@pytest.fixture()
def app():
"""
Expand Down
13 changes: 12 additions & 1 deletion tests/functional/features/patient_access_retrieve.feature
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,15 @@ Feature: Patient Access (Retrieve)

Then I get a 403 HTTP response code
And ACCESS_DENIED is at issue[0].details.coding[0].code in the response body
And Patient cannot perform this action is at issue[0].details.coding[0].display in the response body
And Patient cannot perform this action is at issue[0].details.coding[0].display in the response body

Scenario: Patient cannot retrieve their record with an expired token
Given I am a P9 user
And scope added to product
And I have an expired access token

When I retrieve my details

Then I get a 401 HTTP response code
And ACCESS_DENIED is at issue[0].details.coding[0].code in the response body
And Access Token expired is at issue[0].diagnostics in the response body
16 changes: 10 additions & 6 deletions tests/functional/test_patient_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@

@pytest.fixture(scope='function')
def headers_with_authorization(_nhsd_apim_auth_token_data: dict,
identity_service_base_url: str,
add_asid_to_testapp: None) -> dict:
"""Assign required headers with the Authorization header"""

access_token = _nhsd_apim_auth_token_data.get("access_token", "")

headers = {"X-Request-ID": str(uuid.uuid1()),
"X-Correlation-ID": str(uuid.uuid1()),
"Authorization": f'Bearer {access_token}'
}

headers = {
"X-Request-ID": str(uuid.uuid1()),
"X-Correlation-ID": str(uuid.uuid1()),
"Authorization": f'Bearer {access_token}'
}
return headers


Expand Down Expand Up @@ -52,6 +51,11 @@ def test_cannot_retrieve_incorrect_path():
pass


@retrieve_scenario('Patient cannot retrieve their record with an expired token')
def test_cannot_retrieve_with_expired_token():
pass


@update_scenario('Patient cannot update another patient')
def test_cannot_update_another_patient():
pass
Expand Down
Loading