-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PRMP-1188 Create a lambda to handle MNS notifications #470
base: main
Are you sure you want to change the base?
Changes from 33 commits
d11669e
76970c3
bc0fe45
81707f5
aa21f1e
f970374
d608e19
e7b69bb
488b81b
8c17876
104f8c1
d47671e
41cf2f4
50c8573
395c46e
5bcc4fd
68b676a
dc50dce
5d90083
681a5d6
48b54d0
1f94d46
caaf8bd
4ffb470
41b6ca3
7c0fe28
7943515
8422e69
bbfd720
f21a205
d1e93c2
9a8af85
c34ad77
85ea123
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,18 +16,17 @@ on: | |
AWS_ASSUME_ROLE: | ||
required: true | ||
permissions: | ||
pull-requests: write | ||
id-token: write # This is required for requesting the JWT | ||
contents: read # This is required for actions/checkout | ||
jobs: | ||
batch_update_build_docker_image: | ||
placeholder: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still want this to be a placeholder? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the workflow that we want is currently on a different ticket that hasn't yet been merged in. |
||
runs-on: ubuntu-latest | ||
environment: ${{ inputs.environment }} | ||
defaults: | ||
run: | ||
working-directory: lambdas | ||
steps: | ||
- name: Placeholder | ||
run: | | ||
echo "Running placeholder job on ${inputs.sandbox}" | ||
run: | | ||
echo "Running placeholder job on ${inputs.sandbox}" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from enum import StrEnum | ||
|
||
|
||
class MNSNotificationTypes(StrEnum): | ||
CHANGE_OF_GP = "pds-change-of-gp-1" | ||
oliverbeumkes-nhs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DEATH_NOTIFICATION = "pds-death-notification-1" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import json | ||
|
||
from enums.mns_notification_types import MNSNotificationTypes | ||
from models.mns_sqs_message import MNSSQSMessage | ||
from pydantic_core._pydantic_core import ValidationError | ||
from services.process_mns_message_service import MNSNotificationService | ||
from utils.audit_logging_setup import LoggingService | ||
from utils.decorators.ensure_env_var import ensure_environment_variables | ||
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions | ||
from utils.decorators.override_error_check import override_error_check | ||
from utils.decorators.set_audit_arg import set_request_context_for_logging | ||
from utils.request_context import request_context | ||
|
||
logger = LoggingService(__name__) | ||
|
||
|
||
@set_request_context_for_logging | ||
@ensure_environment_variables( | ||
names=[ | ||
"APPCONFIG_CONFIGURATION", | ||
"APPCONFIG_ENVIRONMENT", | ||
"LLOYD_GEORGE_DYNAMODB_NAME", | ||
"MNS_NOTIFICATION_QUEUE_URL", | ||
] | ||
) | ||
@override_error_check | ||
@handle_lambda_exceptions | ||
def lambda_handler(event, context): | ||
logger.info(f"Received MNS notification event: {event}") | ||
notification_service = MNSNotificationService() | ||
sqs_messages = event.get("Records", []) | ||
|
||
for sqs_message in sqs_messages: | ||
try: | ||
sqs_message = json.loads(sqs_message["body"]) | ||
|
||
mns_message = MNSSQSMessage(**sqs_message) | ||
MNSSQSMessage.model_validate(mns_message) | ||
|
||
NogaNHS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
request_context.patient_nhs_no = mns_message.subject.nhs_number | ||
|
||
if mns_message.type in MNSNotificationTypes.__members__.values(): | ||
notification_service.handle_mns_notification(mns_message) | ||
|
||
except ValidationError as error: | ||
logger.error("Malformed MNS notification message") | ||
logger.error(error) | ||
except Exception as error: | ||
abbas-khan10 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logger.error(f"Error processing SQS message: {error}.") | ||
logger.info("Continuing to next message.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from pydantic import AliasGenerator, BaseModel, ConfigDict | ||
from pydantic.alias_generators import to_camel | ||
|
||
|
||
class MNSMessageSubject(BaseModel): | ||
model_config = ConfigDict( | ||
alias_generator=AliasGenerator( | ||
validation_alias=to_camel, serialization_alias=to_camel | ||
), | ||
) | ||
nhs_number: str | ||
|
||
|
||
class MNSSQSMessage(BaseModel): | ||
model_config = ConfigDict( | ||
alias_generator=AliasGenerator( | ||
validation_alias=to_camel, serialization_alias=to_camel | ||
), | ||
) | ||
id: str | ||
type: str | ||
subject: MNSMessageSubject | ||
data: dict |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import os | ||
from datetime import datetime | ||
|
||
from botocore.exceptions import ClientError | ||
from enums.death_notification_status import DeathNotificationStatus | ||
from enums.metadata_field_names import DocumentReferenceMetadataFields | ||
from enums.mns_notification_types import MNSNotificationTypes | ||
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus | ||
from models.mns_sqs_message import MNSSQSMessage | ||
from services.base.dynamo_service import DynamoDBService | ||
from services.base.sqs_service import SQSService | ||
from utils.audit_logging_setup import LoggingService | ||
from utils.exceptions import PdsErrorException | ||
from utils.utilities import get_pds_service | ||
|
||
logger = LoggingService(__name__) | ||
|
||
|
||
class MNSNotificationService: | ||
def __init__(self): | ||
self.dynamo_service = DynamoDBService() | ||
self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME") | ||
self.pds_service = get_pds_service() | ||
self.sqs_service = SQSService() | ||
self.queue = os.getenv("MNS_NOTIFICATION_QUEUE_URL") | ||
|
||
def handle_mns_notification(self, message: MNSSQSMessage): | ||
try: | ||
match message.type: | ||
case MNSNotificationTypes.CHANGE_OF_GP: | ||
logger.info("Handling GP change notification.") | ||
self.handle_gp_change_notification(message) | ||
case MNSNotificationTypes.DEATH_NOTIFICATION: | ||
logger.info("Handling death status notification.") | ||
self.handle_death_notification(message) | ||
|
||
except PdsErrorException: | ||
logger.info("An error occurred when calling PDS") | ||
self.send_message_back_to_queue(message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a log saying we're putting the message back? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this queue working in a different way? I've never had to put anything back on a message queue manually, it's always been done on an acknowledgement system. |
||
|
||
except ClientError as e: | ||
logger.info( | ||
f"Unable to process message: {message.id}, of type: {message.type}" | ||
) | ||
logger.info(f"{e}") | ||
|
||
def handle_gp_change_notification(self, message: MNSSQSMessage): | ||
patient_document_references = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
|
||
if not self.patient_is_present_in_ndr(patient_document_references): | ||
return | ||
Comment on lines
+48
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that both the change of GP and the death notifications (and I'd imagine any potential future notifications) involve this step of pulling out the document references and then checking for the presence of this patient in the NDR, I'd suggest we pull these lines out and do this check on line 29 before the |
||
|
||
updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) | ||
|
||
for reference in patient_document_references: | ||
if reference["CurrentGpOds"] is not updated_ods_code: | ||
self.dynamo_service.update_item( | ||
table_name=self.table, | ||
key=reference["ID"], | ||
updated_fields={ | ||
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: updated_ods_code, | ||
DocumentReferenceMetadataFields.LAST_UPDATED.value: int( | ||
datetime.now().timestamp() | ||
), | ||
}, | ||
) | ||
|
||
logger.info("Update complete for change of GP") | ||
|
||
def handle_death_notification(self, message: MNSSQSMessage): | ||
death_notification_type = message.data["deathNotificationStatus"] | ||
match death_notification_type: | ||
case DeathNotificationStatus.INFORMAL: | ||
logger.info( | ||
"Patient is deceased - INFORMAL, moving on to the next message." | ||
) | ||
return | ||
|
||
case DeathNotificationStatus.REMOVED: | ||
patient_documents = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
if not self.patient_is_present_in_ndr(patient_documents): | ||
return | ||
|
||
updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) | ||
self.update_patient_ods_code(patient_documents, updated_ods_code) | ||
logger.info("Update complete for death notification change.") | ||
|
||
case DeathNotificationStatus.FORMAL: | ||
patient_documents = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
if not self.patient_is_present_in_ndr(patient_documents): | ||
return | ||
|
||
self.update_patient_ods_code( | ||
patient_documents, PatientOdsInactiveStatus.DECEASED | ||
) | ||
logger.info( | ||
f"Update complete, patient marked {PatientOdsInactiveStatus.DECEASED}." | ||
) | ||
|
||
def get_patient_documents(self, nhs_number: str): | ||
logger.info("Getting patient document references...") | ||
response = self.dynamo_service.query_table_by_index( | ||
table_name=self.table, | ||
index_name="NhsNumberIndex", | ||
search_key="NhsNumber", | ||
search_condition=nhs_number, | ||
) | ||
steph-torres-nhs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return response["Items"] | ||
|
||
def update_patient_ods_code(self, patient_documents: list[dict], code: str) -> None: | ||
for document in patient_documents: | ||
logger.info("Updating patient document reference...") | ||
self.dynamo_service.update_item( | ||
table_name=self.table, | ||
key=document["ID"], | ||
updated_fields={ | ||
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: code, | ||
DocumentReferenceMetadataFields.LAST_UPDATED.value: int( | ||
datetime.now().timestamp() | ||
), | ||
}, | ||
) | ||
|
||
def get_updated_gp_ods(self, nhs_number: str) -> str: | ||
patient_details = self.pds_service.fetch_patient_details(nhs_number) | ||
return patient_details.general_practice_ods | ||
|
||
def send_message_back_to_queue(self, message: MNSSQSMessage): | ||
logger.info("Sending message back to queue...") | ||
self.sqs_service.send_message_standard( | ||
queue_url=self.queue, message_body=message.model_dump_json(by_alias=True) | ||
) | ||
|
||
def patient_is_present_in_ndr(self, dynamo_response): | ||
if len(dynamo_response) < 1: | ||
logger.info("Patient is not held in the National Document Repository.") | ||
logger.info("Moving on to the next message.") | ||
return False | ||
else: | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a condition for when the env is a sandbox?