Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PRMP-760 Change to ICB release date logic #21

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 70 additions & 43 deletions lambda/bulk-ods-update/bulk_ods_update.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import os
import tempfile
from datetime import date, timedelta
import calendar
import csv
import logging
import os
import tempfile
from datetime import date, timedelta, datetime

import boto3

from utils.enums.trud import OdsDownloadType, TrudItem
from utils.models.ods_models import PracticeOds, IcbOds
from utils.services.ssm_service import SsmSecretManager
from utils.services.trud_api_service import TrudApiService

import logging

from utils.trud_files import (
GP_FILE_HEADERS,
ICB_FILE_HEADERS,
Expand All @@ -31,44 +29,54 @@


def lambda_handler(event, context):
download_type = determine_ods_manifest_download_type()
ssm = boto3.client("ssm")
trud_api_key_param = os.environ.get("TRUD_API_KEY_PARAM_NAME")
ssm_service = SsmSecretManager(ssm)
trud_api_key = ssm_service.get_secret(trud_api_key_param) if trud_api_key_param else ""
trud_service = TrudApiService(
api_key=trud_api_key,
api_url=os.environ.get("TRUD_FHIR_API_URL_PARAM_NAME"),
)

extract_and_process_ods_gp_data(trud_service)
try:
ssm = boto3.client("ssm")
trud_api_key_param = os.environ.get("TRUD_API_KEY_PARAM_NAME")
ssm_service = SsmSecretManager(ssm)
trud_api_key = ssm_service.get_secret(trud_api_key_param) if trud_api_key_param else ""
trud_service = TrudApiService(
api_key=trud_api_key,
api_url=os.environ.get("TRUD_FHIR_API_URL_PARAM_NAME"),
)

if download_type == OdsDownloadType.BOTH:
extract_and_process_ods_gp_data(trud_service)
extract_and_process_ods_icb_data(trud_service)

return {"statusCode": 200}
return {"statusCode": 200}
except Exception as e:
logger.info(f"An unexpected error occurred: {e}")
return {"statusCode": 400}


def determine_ods_manifest_download_type() -> OdsDownloadType:
logger.info("Determining download type")
today = date.today()
def find_date_for_friday(start_date):
last_friday = start_date
while last_friday.weekday() != 4:
last_friday -= timedelta(days=1)
return last_friday

total_days_in_month = calendar.monthrange(today.year, today.month)[1]
last_date_of_month = date(today.year, today.month, total_days_in_month)

last_sunday_of_month = last_date_of_month
def determine_if_last_friday_was_the_last_friday_of_the_month():
logger.info("Determining if last friday date was the last friday of the month")
today = date.today()
last_friday = find_date_for_friday(today)

total_days_in_month = calendar.monthrange(last_friday.year, last_friday.month)[1]
last_date_of_month = date(last_friday.year, last_friday.month, total_days_in_month)

while last_sunday_of_month.weekday() != 6:
last_sunday_of_month -= timedelta(days=1)
last_friday_of_month = find_date_for_friday(last_date_of_month)

is_icb_download_date = today == last_sunday_of_month
return last_friday == last_friday_of_month

if is_icb_download_date:
logger.info("Download type set to: GP and ICB")
return OdsDownloadType.BOTH

logger.info("Download type set to: GP")
return OdsDownloadType.GP
def check_release_date_within_last_7_days(release_date_string: str):
try:
release_date = datetime.strptime(release_date_string, '%Y-%m-%d').date()
today = date.today()
days_since_release = today - release_date
return days_since_release <= timedelta(days=7)
except (TypeError, ValueError):
logger.info("failed to check release date")
raise UnableToGetReleaseDate()


def extract_and_process_ods_gp_data(trud_service: TrudApiService):
Expand All @@ -77,11 +85,11 @@ def extract_and_process_ods_gp_data(trud_service: TrudApiService):
gp_ods_releases = trud_service.get_release_list(
TrudItem.NHS_ODS_WEEKLY, is_latest=True
)

gp_ods_latest_releases = gp_ods_releases[0]
logger.info(gp_ods_releases)

download_file_bytes = trud_service.get_download_file(
gp_ods_releases[0].get("archiveFileUrl")
gp_ods_latest_releases.get("archiveFileUrl")
)

eppracur_csv_path = os.path.join(TEMP_DIR, GP_WEEKLY_FILE_NAME)
Expand All @@ -105,16 +113,27 @@ def extract_and_process_ods_gp_data(trud_service: TrudApiService):


def extract_and_process_ods_icb_data(trud_service: TrudApiService):
logger.info("Extracting and processing ODS ICB data")
logger.info("Getting latest ICB release details")

icb_ods_releases = trud_service.get_release_list(
TrudItem.ORG_REF_DATA_MONTHLY, True
)
icb_ods_latest_release = icb_ods_releases[0]
logger.info(icb_ods_releases)

is_quarterly_release = icb_ods_releases[0].get("name").endswith(".0.0")
try:
release_date_string = icb_ods_latest_release.get("releaseDate")
is_latest_icb_download_recent = check_release_date_within_last_7_days(release_date_string)
except UnableToGetReleaseDate:
is_latest_icb_download_recent = determine_if_last_friday_was_the_last_friday_of_the_month()

if not is_latest_icb_download_recent:
logger.info("No ICB download for this week, skipping download")
return None

logger.info("Proceeding to download ICB data")
is_quarterly_release = icb_ods_latest_release.get("name").endswith(".0.0")
download_file = trud_service.get_download_file(
icb_ods_releases[0].get("archiveFileUrl")
icb_ods_latest_release.get("archiveFileUrl")
)

icb_zip_file_path = (
Expand All @@ -125,11 +144,13 @@ def extract_and_process_ods_icb_data(trud_service: TrudApiService):
)

icb_ods_data_amended_data = []
logger.info("Extracting and processing ODS ICB data")

if icb_zip_file := trud_service.unzipping_files(
download_file, icb_zip_file_path, TEMP_DIR, True
download_file, icb_zip_file_path, TEMP_DIR, True
):
if icb_csv_file := trud_service.unzipping_files(
icb_zip_file, icb_csv_file_name, TEMP_DIR
icb_zip_file, icb_csv_file_name, TEMP_DIR
):
icb_ods_data = trud_csv_to_dict(icb_csv_file, ICB_FILE_HEADERS)
icb_ods_data_amended_data = get_amended_records(icb_ods_data)
Expand Down Expand Up @@ -176,7 +197,8 @@ def compare_and_overwrite(download_type: OdsDownloadType, data: list[dict]):
PracticeOds.icb_ods_code.set(amended_record.get("IcbOdsCode")),
]
)
logger.info(f'Overwriting for ODS: {amended_record.get("PracticeOdsCode")} - Name: {amended_record.get("PracticeName")} | ICB: {amended_record.get("IcbOdsCode")}')
logger.info(
f'Overwriting for ODS: {amended_record.get("PracticeOdsCode")} - Name: {amended_record.get("PracticeName")} | ICB: {amended_record.get("IcbOdsCode")}')
except Exception as e:
logger.info(
f"Failed to create/update record by Practice ODS code: {str(e)}"
Expand All @@ -188,6 +210,11 @@ def compare_and_overwrite(download_type: OdsDownloadType, data: list[dict]):
try:
icb = IcbOds(amended_record.get("IcbOdsCode"))
icb.update(actions=[IcbOds.icb_name.set(amended_record.get("IcbName"))])
logger.info(f'Overwriting for ODS: {amended_record.get("IcbOdsCode")} - Name: {amended_record.get("IcbName")}')
logger.info(
f'Overwriting for ODS: {amended_record.get("IcbOdsCode")} - Name: {amended_record.get("IcbName")}')
except Exception as e:
logger.info(f"Failed to create/update record by ICB ODS code: {str(e)}")


class UnableToGetReleaseDate(Exception):
pass
7 changes: 3 additions & 4 deletions lambda/event-enrichment/event_enrichment_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ def _publish_enriched_events_to_sns_topic(enriched_events: list):


def _fetch_supplier_details(practice_ods_code: str) -> dict:

if not practice_ods_code or practice_ods_code.isspace():
return EMPTY_ORGANISATION

Expand Down Expand Up @@ -315,9 +314,9 @@ def _fetch_supplier_details(practice_ods_code: str) -> dict:

def _has_supplier_ods_code(extension: dict) -> bool:
return (
"SDS-ManufacturingOrganisation" in extension["url"]
and "ods-organization-code"
in extension["valueReference"]["identifier"]["system"]
"SDS-ManufacturingOrganisation" in extension["url"]
and "ods-organization-code"
in extension["valueReference"]["identifier"]["system"]
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ data "aws_ssm_parameter" "trud_api_endpoint" {
resource "aws_cloudwatch_event_rule" "ods_bulk_update_schedule" {
name = "${var.environment}_ods_bulk_update_schedule"
description = "Schedule for ODS Update Lambda"
schedule_expression = "cron(0 2 ? * 1 *)"
schedule_expression = "cron(0 4 ? * 1 *)"
}

resource "aws_cloudwatch_event_target" "ods_bulk_update_schedule_event" {
Expand Down
9 changes: 4 additions & 5 deletions utils/services/trud_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import BytesIO
from zipfile import ZipFile
import urllib3
from urllib3.exceptions import HTTPError
from urllib3.util.retry import Retry

import logging
Expand Down Expand Up @@ -37,9 +38,8 @@ def get_release_list(self, item_number: TrudItem, is_latest=False):
trud_response.release_conn()

return response
except Exception as e:
except HTTPError as e:
logger.info(f"An unexpected error occurred: {e}")
raise e

def get_download_url_by_release(
self, releases_list, break_at_quarterly_release=True
Expand All @@ -54,11 +54,10 @@ def get_download_url_by_release(
def get_download_file(self, download_url):
try:
download_response = self.http.request("GET", download_url)
logger.info(download_response)
logger.info(download_response.json())
return download_response.data
except Exception as e:
except HTTPError as e:
logger.info(f"An unexpected error occurred: {e}")
raise e

def unzipping_files(self, zip_file, path=None, path_to_extract=None, byte: bool = False):
myzip = ZipFile(BytesIO(zip_file) if byte else zip_file)
Expand Down
Loading