diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 195632f5b..f1185c986 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -402,4 +402,17 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_mns_notification_lambda: + name: Deploy mns notification lambda + uses: ./.github/workflows/base-lambdas-reusable-deploy.yml + with: + environment: ${{ inputs.environment}} + python_version: ${{ inputs.python_version }} + build_branch: ${{ inputs.build_branch}} + sandbox: ${{ inputs.sandbox }} + lambda_handler_name: mns_notification_handler + lambda_aws_name: MNSNotificationLambda + lambda_layer_names: 'core_lambda_layer' + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/.github/workflows/subscribe-to-mns.yml b/.github/workflows/subscribe-to-mns.yml index c49f68eae..1325ea5ef 100644 --- a/.github/workflows/subscribe-to-mns.yml +++ b/.github/workflows/subscribe-to-mns.yml @@ -16,11 +16,10 @@ on: AWS_ASSUME_ROLE: required: true permissions: - pull-requests: write id-token: write # This is required for requesting the JWT contents: read # This is required for actions/checkout jobs: - batch_update_build_docker_image: + placeholder: runs-on: ubuntu-latest environment: ${{ inputs.environment }} defaults: @@ -28,6 +27,6 @@ jobs: working-directory: lambdas steps: - name: Placeholder - run: | - echo "Running placeholder job on ${inputs.sandbox}" + run: | + echo "Running placeholder job on ${inputs.sandbox}" diff --git a/lambdas/enums/mns_notification_types.py b/lambdas/enums/mns_notification_types.py new file mode 100644 index 000000000..9ae50aa02 --- /dev/null +++ b/lambdas/enums/mns_notification_types.py @@ -0,0 +1,6 @@ +from enum import StrEnum + + +class MNSNotificationTypes(StrEnum): + CHANGE_OF_GP = "pds-change-of-gp-1" + DEATH_NOTIFICATION = "pds-death-notification-1" diff --git a/lambdas/handlers/mns_notification_handler.py b/lambdas/handlers/mns_notification_handler.py new file mode 100644 index 000000000..012d3401b --- /dev/null +++ b/lambdas/handlers/mns_notification_handler.py @@ -0,0 +1,50 @@ +import json + +from enums.mns_notification_types import MNSNotificationTypes +from models.mns_sqs_message import MNSSQSMessage +from pydantic_core._pydantic_core import ValidationError +from services.process_mns_message_service import MNSNotificationService +from utils.audit_logging_setup import LoggingService +from utils.decorators.ensure_env_var import ensure_environment_variables +from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions +from utils.decorators.override_error_check import override_error_check +from utils.decorators.set_audit_arg import set_request_context_for_logging +from utils.request_context import request_context + +logger = LoggingService(__name__) + + +@set_request_context_for_logging +@ensure_environment_variables( + names=[ + "APPCONFIG_CONFIGURATION", + "APPCONFIG_ENVIRONMENT", + "LLOYD_GEORGE_DYNAMODB_NAME", + "MNS_NOTIFICATION_QUEUE_URL", + ] +) +@override_error_check +@handle_lambda_exceptions +def lambda_handler(event, context): + logger.info(f"Received MNS notification event: {event}") + notification_service = MNSNotificationService() + sqs_messages = event.get("Records", []) + + for sqs_message in sqs_messages: + try: + sqs_message = json.loads(sqs_message["body"]) + + mns_message = MNSSQSMessage(**sqs_message) + MNSSQSMessage.model_validate(mns_message) + + request_context.patient_nhs_no = mns_message.subject.nhs_number + + if mns_message.type in MNSNotificationTypes.__members__.values(): + notification_service.handle_mns_notification(mns_message) + + except ValidationError as error: + logger.error("Malformed MNS notification message") + logger.error(error) + except Exception as error: + logger.error(f"Error processing SQS message: {error}.") + logger.info("Continuing to next message.") diff --git a/lambdas/models/mns_sqs_message.py b/lambdas/models/mns_sqs_message.py new file mode 100644 index 000000000..2431da3be --- /dev/null +++ b/lambdas/models/mns_sqs_message.py @@ -0,0 +1,23 @@ +from pydantic import AliasGenerator, BaseModel, ConfigDict +from pydantic.alias_generators import to_camel + + +class MNSMessageSubject(BaseModel): + model_config = ConfigDict( + alias_generator=AliasGenerator( + validation_alias=to_camel, serialization_alias=to_camel + ), + ) + nhs_number: str + + +class MNSSQSMessage(BaseModel): + model_config = ConfigDict( + alias_generator=AliasGenerator( + validation_alias=to_camel, serialization_alias=to_camel + ), + ) + id: str + type: str + subject: MNSMessageSubject + data: dict diff --git a/lambdas/services/process_mns_message_service.py b/lambdas/services/process_mns_message_service.py new file mode 100644 index 000000000..ca8fb6e81 --- /dev/null +++ b/lambdas/services/process_mns_message_service.py @@ -0,0 +1,146 @@ +import os +from datetime import datetime + +from botocore.exceptions import ClientError +from enums.death_notification_status import DeathNotificationStatus +from enums.metadata_field_names import DocumentReferenceMetadataFields +from enums.mns_notification_types import MNSNotificationTypes +from enums.patient_ods_inactive_status import PatientOdsInactiveStatus +from models.mns_sqs_message import MNSSQSMessage +from services.base.dynamo_service import DynamoDBService +from services.base.sqs_service import SQSService +from utils.audit_logging_setup import LoggingService +from utils.exceptions import PdsErrorException +from utils.utilities import get_pds_service + +logger = LoggingService(__name__) + + +class MNSNotificationService: + def __init__(self): + self.dynamo_service = DynamoDBService() + self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME") + self.pds_service = get_pds_service() + self.sqs_service = SQSService() + self.queue = os.getenv("MNS_NOTIFICATION_QUEUE_URL") + + def handle_mns_notification(self, message: MNSSQSMessage): + try: + match message.type: + case MNSNotificationTypes.CHANGE_OF_GP: + logger.info("Handling GP change notification.") + self.handle_gp_change_notification(message) + case MNSNotificationTypes.DEATH_NOTIFICATION: + logger.info("Handling death status notification.") + self.handle_death_notification(message) + + except PdsErrorException: + logger.info("An error occurred when calling PDS") + self.send_message_back_to_queue(message) + + except ClientError as e: + logger.info( + f"Unable to process message: {message.id}, of type: {message.type}" + ) + logger.info(f"{e}") + + def handle_gp_change_notification(self, message: MNSSQSMessage): + patient_document_references = self.get_patient_documents( + message.subject.nhs_number + ) + + if not self.patient_is_present_in_ndr(patient_document_references): + return + + updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) + + for reference in patient_document_references: + if reference["CurrentGpOds"] is not updated_ods_code: + self.dynamo_service.update_item( + table_name=self.table, + key=reference["ID"], + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: updated_ods_code, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.now().timestamp() + ), + }, + ) + + logger.info("Update complete for change of GP") + + def handle_death_notification(self, message: MNSSQSMessage): + death_notification_type = message.data["deathNotificationStatus"] + match death_notification_type: + case DeathNotificationStatus.INFORMAL: + logger.info( + "Patient is deceased - INFORMAL, moving on to the next message." + ) + return + + case DeathNotificationStatus.REMOVED: + patient_documents = self.get_patient_documents( + message.subject.nhs_number + ) + if not self.patient_is_present_in_ndr(patient_documents): + return + + updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) + self.update_patient_ods_code(patient_documents, updated_ods_code) + logger.info("Update complete for death notification change.") + + case DeathNotificationStatus.FORMAL: + patient_documents = self.get_patient_documents( + message.subject.nhs_number + ) + if not self.patient_is_present_in_ndr(patient_documents): + return + + self.update_patient_ods_code( + patient_documents, PatientOdsInactiveStatus.DECEASED + ) + logger.info( + f"Update complete, patient marked {PatientOdsInactiveStatus.DECEASED}." + ) + + def get_patient_documents(self, nhs_number: str): + logger.info("Getting patient document references...") + response = self.dynamo_service.query_table_by_index( + table_name=self.table, + index_name="NhsNumberIndex", + search_key="NhsNumber", + search_condition=nhs_number, + ) + return response["Items"] + + def update_patient_ods_code(self, patient_documents: list[dict], code: str) -> None: + for document in patient_documents: + logger.info("Updating patient document reference...") + self.dynamo_service.update_item( + table_name=self.table, + key=document["ID"], + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: code, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.now().timestamp() + ), + }, + ) + + def get_updated_gp_ods(self, nhs_number: str) -> str: + patient_details = self.pds_service.fetch_patient_details(nhs_number) + return patient_details.general_practice_ods + + def send_message_back_to_queue(self, message: MNSSQSMessage): + logger.info("Sending message back to queue...") + self.sqs_service.send_message_standard( + queue_url=self.queue, message_body=message.model_dump_json(by_alias=True) + ) + + def patient_is_present_in_ndr(self, dynamo_response): + if len(dynamo_response) < 1: + logger.info("Patient is not held in the National Document Repository.") + logger.info("Moving on to the next message.") + return False + else: + return True diff --git a/lambdas/tests/unit/conftest.py b/lambdas/tests/unit/conftest.py index 0ab97fc6b..217bc6e1a 100644 --- a/lambdas/tests/unit/conftest.py +++ b/lambdas/tests/unit/conftest.py @@ -32,6 +32,7 @@ MOCK_LG_STAGING_STORE_BUCKET_ENV_NAME = "STAGING_STORE_BUCKET_NAME" MOCK_LG_METADATA_SQS_QUEUE_ENV_NAME = "METADATA_SQS_QUEUE_URL" MOCK_LG_INVALID_SQS_QUEUE_ENV_NAME = "INVALID_SQS_QUEUE_URL" +MOCK_MNS_SQS_QUEUE_ENV_NAME = "MNS_SQS_QUEUE_URL" MOCK_LG_BULK_UPLOAD_DYNAMO_ENV_NAME = "BULK_UPLOAD_DYNAMODB_NAME" MOCK_AUTH_DYNAMODB_NAME = "AUTH_DYNAMODB_NAME" @@ -171,9 +172,11 @@ def set_env(monkeypatch): ) monkeypatch.setenv("NRL_API_ENDPOINT", FAKE_URL) monkeypatch.setenv("NRL_END_USER_ODS_CODE", "test_nrl_user_ods_ssm_key") + monkeypatch.setenv("MNS_NOTIFICATION_QUEUE_URL", MOCK_MNS_SQS_QUEUE_ENV_NAME) monkeypatch.setenv("NRL_SQS_QUEUE_URL", NRL_SQS_URL) + EXPECTED_PARSED_PATIENT_BASE_CASE = PatientDetails( givenName=["Jane"], familyName="Smith", diff --git a/lambdas/tests/unit/handlers/test_mns_notification_handler.py b/lambdas/tests/unit/handlers/test_mns_notification_handler.py new file mode 100644 index 000000000..cce52779a --- /dev/null +++ b/lambdas/tests/unit/handlers/test_mns_notification_handler.py @@ -0,0 +1,117 @@ +import json +from copy import deepcopy + +import pytest +from enums.mns_notification_types import MNSNotificationTypes +from handlers.mns_notification_handler import lambda_handler +from tests.unit.conftest import FAKE_URL, TEST_NHS_NUMBER, TEST_UUID + +MOCK_TIME = "2022-04-05T17:31:00.000Z" + +MOCK_GP_CHANGE_MESSAGE_BODY = { + "id": TEST_UUID, + "type": MNSNotificationTypes.CHANGE_OF_GP.value, + "subject": { + "nhsNumber": TEST_NHS_NUMBER, + "familyName": "SMITH", + "dob": "2017-10-02", + }, + "source": { + "name": FAKE_URL, + "identifiers": { + "system": FAKE_URL, + "value": TEST_UUID, + }, + }, + "time": MOCK_TIME, + "data": { + "fullUrl": FAKE_URL, + "versionId": TEST_UUID, + "provenance": { + "name": "Fake GP", + "identifiers": { + "system": FAKE_URL, + "value": TEST_UUID, + }, + }, + "registrationEncounterCode": "00", + }, +} + +MOCK_DEATH_MESSAGE_BODY = { + "id": TEST_UUID, + "type": "pds-death-notification-1", + "subject": { + "dob": "2017-10-02", + "familyName": "DAWKINS", + "nhsNumber": TEST_NHS_NUMBER, + }, + "source": { + "name": "NHS DIGITAL", + "identifier": { + "system": "https://fhir.nhs.uk/Id/nhsSpineASID", + "value": "477121000324", + }, + }, + "time": MOCK_TIME, + "data": { + "versionId": 'W/"16"', + "fullUrl": "https://int.api.service.nhs.uk/personal-demographics/FHIR/R4/Patient/9912003888", + "deathNotificationStatus": "2", + "provenance": { + "name": "The GP Practice", + "identifiers": { + "system": "https://fhir.nhs.uk/Id/nhsSpineASID", + "value": "477121000323", + }, + }, + }, +} + +MOCK_OTHER_NOTIFICATION_MESSAGE_BODY = deepcopy(MOCK_GP_CHANGE_MESSAGE_BODY) +MOCK_OTHER_NOTIFICATION_MESSAGE_BODY["type"] = "imms-vaccinations-1" + +MOCK_INFORMAL_DEATH_MESSAGE_BODY = deepcopy(MOCK_DEATH_MESSAGE_BODY) +MOCK_INFORMAL_DEATH_MESSAGE_BODY["data"]["deathNotificationStatus"] = "1" + +MOCK_REMOVED_DEATH_MESSAGE_BODY = deepcopy(MOCK_DEATH_MESSAGE_BODY) +MOCK_REMOVED_DEATH_MESSAGE_BODY["data"]["deathNotificationStatus"] = "U" + + +@pytest.fixture +def mock_service(mocker): + mocked_class = mocker.patch( + "handlers.mns_notification_handler.MNSNotificationService" + ) + mocked_instance = mocked_class.return_value + mocker.patch.object(mocked_instance, "dynamo_service") + mocker.patch.object(mocked_instance, "pds_service") + yield mocked_instance + + +def test_handle_notification_called_message_type_gp_change( + context, set_env, mock_service +): + + event = {"Records": [{"body": json.dumps(MOCK_GP_CHANGE_MESSAGE_BODY)}]} + lambda_handler(event, context) + + mock_service.handle_mns_notification.assert_called() + + +def test_handle_notification_called_message_type_death_notification( + context, set_env, mock_service +): + event = {"Records": [{"body": json.dumps(MOCK_DEATH_MESSAGE_BODY)}]} + lambda_handler(event, context) + + mock_service.handle_mns_notification.assert_called() + + +def test_handle_notification_not_called_no_records_in_event( + context, set_env, mock_service +): + event = {"Records": []} + lambda_handler(event, context) + + mock_service.handle_mns_notification.assert_not_called() diff --git a/lambdas/tests/unit/services/test_process_mns_message_service.py b/lambdas/tests/unit/services/test_process_mns_message_service.py new file mode 100644 index 000000000..2cd50e1ae --- /dev/null +++ b/lambdas/tests/unit/services/test_process_mns_message_service.py @@ -0,0 +1,210 @@ +from datetime import datetime +from unittest.mock import call + +import pytest +from enums.metadata_field_names import DocumentReferenceMetadataFields +from enums.patient_ods_inactive_status import PatientOdsInactiveStatus +from freezegun import freeze_time +from models.mns_sqs_message import MNSSQSMessage +from services.process_mns_message_service import MNSNotificationService +from tests.unit.conftest import TEST_CURRENT_GP_ODS +from tests.unit.handlers.test_mns_notification_handler import ( + MOCK_DEATH_MESSAGE_BODY, + MOCK_GP_CHANGE_MESSAGE_BODY, + MOCK_INFORMAL_DEATH_MESSAGE_BODY, + MOCK_REMOVED_DEATH_MESSAGE_BODY, +) +from tests.unit.helpers.data.dynamo_responses import ( + MOCK_EMPTY_RESPONSE, + MOCK_SEARCH_RESPONSE, +) +from utils.exceptions import PdsErrorException + + +@pytest.fixture +def mns_service(mocker, set_env, monkeypatch): + monkeypatch.setenv("PDS_FHIR_IS_STUBBED", "False") + service = MNSNotificationService() + mocker.patch.object(service, "dynamo_service") + mocker.patch.object(service, "get_updated_gp_ods") + mocker.patch.object(service, "sqs_service") + yield service + + +@pytest.fixture +def mock_handle_gp_change(mocker, mns_service): + service = mns_service + mocker.patch.object(service, "handle_gp_change_notification") + yield service + + +@pytest.fixture +def mock_handle_death_notification(mocker, mns_service): + service = mns_service + mocker.patch.object(service, "handle_death_notification") + + +MOCK_UPDATE_TIME = "2024-01-01 12:00:00" + +gp_change_message = MNSSQSMessage(**MOCK_GP_CHANGE_MESSAGE_BODY) +death_notification_message = MNSSQSMessage(**MOCK_DEATH_MESSAGE_BODY) +informal_death_notification_message = MNSSQSMessage(**MOCK_INFORMAL_DEATH_MESSAGE_BODY) +removed_death_notification_message = MNSSQSMessage(**MOCK_REMOVED_DEATH_MESSAGE_BODY) + + +def test_handle_gp_change_message_called_message_type_gp_change( + mns_service, mock_handle_gp_change, mock_handle_death_notification +): + mns_service.handle_mns_notification(gp_change_message) + + mns_service.handle_death_notification.assert_not_called() + mns_service.handle_gp_change_notification.assert_called_with(gp_change_message) + + +def test_handle_gp_change_message_not_called_message_death_message( + mns_service, mock_handle_death_notification, mock_handle_gp_change +): + mns_service.handle_mns_notification(death_notification_message) + + mns_service.handle_gp_change_notification.assert_not_called() + mns_service.handle_death_notification.assert_called_with(death_notification_message) + + +def test_has_patient_in_ndr_populate_response_from_dynamo(mns_service): + response = MOCK_SEARCH_RESPONSE["Items"] + assert mns_service.patient_is_present_in_ndr(response) is True + + +def test_has_patient_in_ndr_empty_dynamo_response(mns_service): + response = MOCK_EMPTY_RESPONSE["Items"] + assert mns_service.patient_is_present_in_ndr(response) is False + + +def test_handle_notification_not_called_message_type_not_death_or_gp_notification( + mns_service, +): + mns_service.handle_mns_notification(informal_death_notification_message) + mns_service.get_updated_gp_ods.assert_not_called() + + +def test_pds_is_called_death_notification_removed(mns_service, mocker): + mocker.patch.object(mns_service, "update_patient_ods_code") + mns_service.dynamo_service.query_table_by_index.return_value = MOCK_SEARCH_RESPONSE + mns_service.handle_mns_notification(removed_death_notification_message) + + mns_service.get_updated_gp_ods.assert_called() + mns_service.update_patient_ods_code.assert_called() + + +@freeze_time(MOCK_UPDATE_TIME) +def test_update_patient_details(mns_service): + mns_service.update_patient_ods_code( + MOCK_SEARCH_RESPONSE["Items"], PatientOdsInactiveStatus.DECEASED + ) + calls = [ + call( + table_name=mns_service.table, + key="3d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: PatientOdsInactiveStatus.DECEASED, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + call( + table_name=mns_service.table, + key="4d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: PatientOdsInactiveStatus.DECEASED, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + call( + table_name=mns_service.table, + key="5d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: PatientOdsInactiveStatus.DECEASED, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + ] + mns_service.dynamo_service.update_item.assert_has_calls(calls, any_order=False) + + +def test_update_gp_ods_not_called_empty_dynamo_response(mns_service): + mns_service.dynamo_service.query_table_by_index.return_value = MOCK_EMPTY_RESPONSE + mns_service.handle_gp_change_notification(gp_change_message) + + mns_service.get_updated_gp_ods.assert_not_called() + + +def test_update_gp_ods_called_dynamo_response(mns_service): + mns_service.dynamo_service.query_table_by_index.return_value = MOCK_SEARCH_RESPONSE + mns_service.handle_gp_change_notification(gp_change_message) + + mns_service.get_updated_gp_ods.assert_called() + + +def test_update_gp_ods_not_called_ods_codes_are_the_same(mns_service): + mns_service.dynamo_service.query_table_by_index.return_value = MOCK_SEARCH_RESPONSE + mns_service.get_updated_gp_ods.return_value = TEST_CURRENT_GP_ODS + mns_service.handle_gp_change_notification(gp_change_message) + + mns_service.dynamo_service.update_item.assert_not_called() + + +@freeze_time(MOCK_UPDATE_TIME) +def test_handle_gp_change_updates_gp_ods_code(mns_service): + mns_service.dynamo_service.query_table_by_index.return_value = MOCK_SEARCH_RESPONSE + other_gp_ods = "Z12345" + mns_service.get_updated_gp_ods.return_value = other_gp_ods + mns_service.handle_gp_change_notification(gp_change_message) + + calls = [ + call( + table_name=mns_service.table, + key="3d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: other_gp_ods, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + call( + table_name=mns_service.table, + key="4d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: other_gp_ods, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + call( + table_name=mns_service.table, + key="5d8683b9-1665-40d2-8499-6e8302d507ff", + updated_fields={ + DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: other_gp_ods, + DocumentReferenceMetadataFields.LAST_UPDATED.value: int( + datetime.fromisoformat(MOCK_UPDATE_TIME).timestamp() + ), + }, + ), + ] + mns_service.dynamo_service.update_item.assert_has_calls(calls, any_order=False) + + +def test_messages_is_put_back_on_the_queue_when_pds_error_raised( + mns_service, mocker, mock_handle_gp_change +): + mns_service.handle_gp_change_notification.side_effect = PdsErrorException() + mocker.patch.object(mns_service, "send_message_back_to_queue") + mns_service.handle_mns_notification(gp_change_message) + + mns_service.send_message_back_to_queue.assert_called_with(gp_change_message)