Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

feat: Group export scripts #389

Merged
merged 7 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions bulkExport/extractPatientCompartmentSearchParams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const fs = require('fs');
const compartmentPatientV3 = require('./schema/compartmentdefinition-patient.3.0.2.json');
const compartmentPatientV4 = require('./schema/compartmentdefinition-patient.4.0.1.json');
const baseSearchParamsV3 = require('../../fhir-works-on-aws-search-es/src/schema/compiledSearchParameters.3.0.1.json');
const baseSearchParamsV4 = require('../../fhir-works-on-aws-search-es/src/schema/compiledSearchParameters.4.0.1.json');

// Create a dictionary of search params
function extractPatientCompartmentSearchParams(baseSearchParams, compartmentPatient) {
Bingjiling marked this conversation as resolved.
Show resolved Hide resolved
const baseSearchParamsDict = {};
// example of an item in baseSearchParamsDict: Account-identifier: {resourceType: "Account", path: "identifier"}
baseSearchParams.forEach(param => {
baseSearchParamsDict[`${param.base}-${param.name}`] = param.compiled;
});

// Find the search params needed for patient compartment
const patientCompartmentSearchParams = {};
compartmentPatient.resource.forEach(resource => {
if (resource.param) {
let compiledPaths = [];
resource.param.forEach(param => {
const pathsForThisParam = baseSearchParamsDict[`${resource.code}-${param}`].map(item => item.path);
compiledPaths = compiledPaths.concat(pathsForThisParam);
});
patientCompartmentSearchParams[resource.code] = compiledPaths;
}
});
return patientCompartmentSearchParams;
}

const patientCompartmentSearchParamsV4 = extractPatientCompartmentSearchParams(
baseSearchParamsV4,
compartmentPatientV4,
);
const patientCompartmentSearchParamsV3 = extractPatientCompartmentSearchParams(
baseSearchParamsV3,
compartmentPatientV3,
);

fs.writeFileSync(
'./schema/patientCompartmentSearchParams.3.0.2.json',
JSON.stringify(patientCompartmentSearchParamsV3),
);
fs.writeFileSync(
'./schema/patientCompartmentSearchParams.4.0.1.json',
JSON.stringify(patientCompartmentSearchParamsV4),
);

console.log(patientCompartmentSearchParamsV4);
console.log(patientCompartmentSearchParamsV3);
72 changes: 70 additions & 2 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import boto3
import re
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
Expand All @@ -24,7 +25,7 @@

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'jobId', 'exportType', 'transactionTime', 'since', 'outputFormat', 'ddbTableName', 'workerType', 'numberWorkers', 's3OutputBucket'])

# type and groupId are optional parameters
# type and tenantId are optional parameters
type = None
if ('--{}'.format('type') in sys.argv):
type = getResolvedOptions(sys.argv, ['type'])['type']
Expand All @@ -35,6 +36,14 @@
if ('--{}'.format('tenantId') in sys.argv):
tenantId = getResolvedOptions(sys.argv, ['tenantId'])['tenantId']

# the following parameters are only needed for group export
group_id = None
if ('--{}'.format('groupId') in sys.argv):
group_id = getResolvedOptions(sys.argv, ['groupId'])['groupId']
s3_script_bucket = getResolvedOptions(sys.argv, ['s3ScriptBucket'])['s3ScriptBucket']
compartment_search_param_file = getResolvedOptions(sys.argv, ['compartmentSearchParamFile'])['compartmentSearchParamFile']
server_url = getResolvedOptions(sys.argv, ['serverUrl'])['serverUrl']
Bingjiling marked this conversation as resolved.
Show resolved Hide resolved

job_id = args['jobId']
export_type = args['exportType']
transaction_time = args['transactionTime']
Expand Down Expand Up @@ -91,11 +100,70 @@ def remove_composite_id(resource):
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
)

print ('start filtering by group_id')
def is_active_group_member(member, datetime_transaction_time):
if getattr(member, 'inactive', None) == True:
return False
member_period = getattr(member, 'period', None)
if member_period != None:
end_date = getattr(member_period, 'end', None)
if end_date != None and datetime.strptime(end_date, "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_transaction_time:
return False
return True

def is_internal_reference(reference, server_url):
if reference.startswith(server_url):
reference = removeprefix(reference, server_url)
reference_split = reference.split('/')
if len(reference_split) == 2:
return True
return False

def deep_get(resource, path):
if resource is None:
return None
if len(path) is 1:
return resource[path[0]]['reference']
return deep_get(resource[path[0]], path.pop(0))

def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url):
# Check if resource is part of the group
if resource['id'] in group_member_ids:
return True
# Check if resource is part of the patient compartment
if resource['resourceType'] in compartment_search_params.keys():
Bingjiling marked this conversation as resolved.
Show resolved Hide resolved
# Get inclusion criteria paths for the resource
inclusion_paths = compartment_search_params[resource.resourceType]
for path in inclusion_paths:
reference = deep_get(resource, path.split("."))
if is_internal_reference(reference, server_url) and reference.split('/')[-1] in group_patient_ids:
return True
return False

if (group_id is None):
filtered_group_frame = filtered_dates_dyn_frame
else:
print('Loading patient compartment search params')
client = boto3.client('s3')
s3Obj = client.get_object(Bucket = s3_script_bucket,
Key = compartment_search_param_file)
compartment_search_params = json.load(s3Obj['Body'])

print('Extract group member ids')
group_members = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member']
active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)]
group_member_ids = set([x.split('/')[-1] for x in active_group_member_references])
group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient'])

print('Extract group member and patient compartment dataframe')
filtered_group_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))


print('Start filtering by documentStatus and resourceType')
# Filter by resource listed in Type and with correct STATUS
type_list = None if type == None else set(type.split(','))
valid_document_state_to_be_read_from = {'AVAILABLE','LOCKED', 'PENDING_DELETE'}
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_dates_dyn_frame,
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_group_frame,
f = lambda x:
x["documentStatus"] in valid_document_state_to_be_read_from if type_list is None
else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list
Expand Down
Loading