Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
feat: add transitive reference to group export (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bingjiling authored Oct 10, 2021
1 parent 2274e8c commit 3c4c57e
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 36 deletions.
2 changes: 1 addition & 1 deletion bulkExport/extractPatientCompartmentSearchParams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Run the script:
The compartment definition files are downloaded from the following URL and saved in folder bulkExport/schema:
compartmentdefinition-patient.4.0.1.json: https://www.hl7.org/fhir/compartmentdefinition-patient.json.html
compartmentdefinition-patient.4.0.1.json: https://www.hl7.org/fhir/compartmentdefinition-patient.json.html (Note Device is added to this file due to Inferno test <BDGV-23: Medication resources returned conform to the US Core Medication Profile if bulk data export has Medication resources>)
compartmentdefinition-patient.3.0.2.json: http://hl7.org/fhir/stu3/compartmentdefinition-patient.json.html (Note the AuditEvent and Provenance fields in this file are updated to remove dotted path)
*/

Expand Down
66 changes: 51 additions & 15 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
s3_script_bucket = getResolvedOptions(sys.argv, ['s3ScriptBucket'])['s3ScriptBucket']
compartment_search_param_file = getResolvedOptions(sys.argv, ['compartmentSearchParamFile'])['compartmentSearchParamFile']
server_url = getResolvedOptions(sys.argv, ['serverUrl'])['serverUrl']
transitive_reference_param_file = "transitiveReferenceParams.json"

job_id = args['jobId']
export_type = args['exportType']
Expand Down Expand Up @@ -109,58 +110,93 @@ def is_internal_reference(reference, server_url):
return False

def deep_get(resource, path):
temp = resource
temp = [resource]
for p in path:
if temp is None:
return None
temp = temp[p]
new_temp = []
for item in temp:
if item is None or p not in item:
continue
if isinstance(item[p], list):
new_temp.extend(item[p])
else:
new_temp.append(item[p])
temp = new_temp
return temp

def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url):
def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url, transitive_reference_ids=set()):
# Check if resource is part of the group
if resource['id'] in group_member_ids:
if resource['id'] in group_member_ids or resource['id'] in transitive_reference_ids:
return True
# Check if resource is part of the patient compartment
if resource['resourceType'] in compartment_search_params:
# Get inclusion criteria paths for the resource
inclusion_paths = compartment_search_params[resource['resourceType']]
for path in inclusion_paths:
reference = deep_get(resource, path.split("."))
if isinstance(reference, dict):
reference = [reference]
elif not isinstance(reference, list):
return False # Inclusion criteria should point to a dict {reference: 'Patient/1234'} or a list of references
for ref in reference:
if is_internal_reference(ref['reference'], server_url) and ref['reference'].split('/')[-1] in group_patient_ids:
return True
return False

# Transitive reference are resources referenced in group members or patient compartment resources
# Transitive reference is extracted based on file bulkExport/schema/transitiveReferenceParams.json
# Update the file and redeploy to change transitive reference to be included
def get_transitive_references(resource, transitive_reference_map, server_url):
if resource['resourceType'] in transitive_reference_map:
path_map = transitive_reference_map[resource['resourceType']]
generated_transitive_refs = []
for path, target_type in path_map.items():
targets = deep_get(resource, path.split('.'))
generated_transitive_refs.extend([target['reference'] for target in targets if is_internal_reference(target['reference'], server_url)])
resource['_generated_transitive_refs'] = generated_transitive_refs if len(generated_transitive_refs) !=0 else None
return resource

datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

if (group_id is None):
filtered_group_frame = filtered_tenant_id_frame
else:
print('Loading patient compartment search params')
client = boto3.client('s3')
s3Obj = client.get_object(Bucket = s3_script_bucket,
s3Obj_compartment = client.get_object(Bucket = s3_script_bucket,
Key = compartment_search_param_file)
compartment_search_params = json.load(s3Obj['Body'])
compartment_search_params = json.load(s3Obj_compartment['Body'])

s3Obj_transitive = client.get_object(Bucket = s3_script_bucket,
Key = transitive_reference_param_file)
transitive_reference_params = json.load(s3Obj_transitive['Body'])

print('Extract group member ids')
group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().sort("meta.versionId").collect()[-1]['member']
active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)]
group_member_ids = set([x.split('/')[-1] for x in active_group_member_references])
group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient'])
print(group_member_ids)
print(group_patient_ids)
print('Group member ids extracted: ', group_member_ids)
print('Group patient ids extracted: ', group_patient_ids)

print('Extract group member and patient compartment dataframe')
filtered_group_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))

filtered_group_reference_frame = filtered_group_frame
if transitive_reference_params:
print('Extract transitive references')
transitive_reference_frame = Map.apply(frame = filtered_group_frame, f = lambda x: get_transitive_references(x, transitive_reference_params, server_url))
transitive_reference_frame = Filter.apply(frame = transitive_reference_frame, f = lambda x: x['_generated_transitive_refs'] is not None)
transitive_reference_frame = SelectFields.apply(frame = transitive_reference_frame, paths=['_generated_transitive_refs']).toDF().collect()

transitive_reference_set = set()
for item in transitive_reference_frame:
transitive_reference_set.update(reference.split('/')[-1] for reference in item['_generated_transitive_refs'])
print('Transitive reference ids extracted: ', transitive_reference_set)

if transitive_reference_set:
# Filter here again from tenant_id_frame to include group members, patient compartment and transitive reference
filtered_group_reference_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url, transitive_reference_ids=transitive_reference_set))

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_frame,
filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_reference_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"Account":["subject"],"AdverseEvent":["subject"],"AllergyIntolerance":["patient","recorder","asserter"],"Appointment":["participant.actor"],"AppointmentResponse":["actor"],"AuditEvent":["agent.who","entity.what"],"Basic":["subject","author"],"BodyStructure":["patient"],"CarePlan":["subject","activity.detail.performer"],"CareTeam":["subject","participant.member"],"ChargeItem":["subject"],"Claim":["patient","payee.party"],"ClaimResponse":["patient"],"ClinicalImpression":["subject"],"Communication":["subject","sender","recipient"],"CommunicationRequest":["subject","sender","recipient","requester"],"Composition":["subject","author","attester.party"],"Condition":["subject","asserter"],"Consent":["patient"],"Coverage":["policyHolder","subscriber","beneficiary","payor"],"CoverageEligibilityRequest":["patient"],"CoverageEligibilityResponse":["patient"],"DetectedIssue":["patient"],"DeviceRequest":["subject","performer"],"DeviceUseStatement":["subject"],"DiagnosticReport":["subject"],"DocumentManifest":["subject","author","recipient"],"DocumentReference":["subject","author"],"Encounter":["subject"],"EnrollmentRequest":["candidate"],"EpisodeOfCare":["patient"],"ExplanationOfBenefit":["patient","payee.party"],"FamilyMemberHistory":["patient"],"Flag":["subject"],"Goal":["subject"],"Group":["member.entity"],"ImagingStudy":["subject"],"Immunization":["patient"],"ImmunizationEvaluation":["patient"],"ImmunizationRecommendation":["patient"],"Invoice":["subject","subject","recipient"],"List":["subject","source"],"MeasureReport":["subject"],"Media":["subject"],"MedicationAdministration":["subject","performer.actor","subject"],"MedicationDispense":["subject","subject","receiver"],"MedicationRequest":["subject"],"MedicationStatement":["subject"],"MolecularSequence":["patient"],"NutritionOrder":["patient"],"Observation":["subject","performer"],"Patient":["link.other"],"Person":["link.target"],"Procedure":["subject","performer.actor"],"Provenance":["target"],"QuestionnaireResponse":["subject","author"],"RelatedPerson":["patient"],"RequestGroup":["subject","action.participant"],"ResearchSubject":["individual"],"RiskAssessment":["subject"],"Schedule":["actor"],"ServiceRequest":["subject","performer"],"Specimen":["subject"],"SupplyDelivery":["patient"],"SupplyRequest":["deliverTo"],"VisionPrescription":["patient"]}
{"Account":["subject"],"AdverseEvent":["subject"],"AllergyIntolerance":["patient","recorder","asserter"],"Appointment":["participant.actor"],"AppointmentResponse":["actor"],"AuditEvent":["agent.who","entity.what"],"Basic":["subject","author"],"BodyStructure":["patient"],"CarePlan":["subject","activity.detail.performer"],"CareTeam":["subject","participant.member"],"ChargeItem":["subject"],"Claim":["patient","payee.party"],"ClaimResponse":["patient"],"ClinicalImpression":["subject"],"Communication":["subject","sender","recipient"],"CommunicationRequest":["subject","sender","recipient","requester"],"Composition":["subject","author","attester.party"],"Condition":["subject","asserter"],"Consent":["patient"],"Coverage":["policyHolder","subscriber","beneficiary","payor"],"CoverageEligibilityRequest":["patient"],"CoverageEligibilityResponse":["patient"],"DetectedIssue":["patient"],"Device":["patient"],"DeviceRequest":["subject","performer"],"DeviceUseStatement":["subject"],"DiagnosticReport":["subject"],"DocumentManifest":["subject","author","recipient"],"DocumentReference":["subject","author"],"Encounter":["subject"],"EnrollmentRequest":["candidate"],"EpisodeOfCare":["patient"],"ExplanationOfBenefit":["patient","payee.party"],"FamilyMemberHistory":["patient"],"Flag":["subject"],"Goal":["subject"],"Group":["member.entity"],"ImagingStudy":["subject"],"Immunization":["patient"],"ImmunizationEvaluation":["patient"],"ImmunizationRecommendation":["patient"],"Invoice":["subject","subject","recipient"],"List":["subject","source"],"MeasureReport":["subject"],"Media":["subject"],"MedicationAdministration":["subject","performer.actor","subject"],"MedicationDispense":["subject","subject","receiver"],"MedicationRequest":["subject"],"MedicationStatement":["subject"],"MolecularSequence":["patient"],"NutritionOrder":["patient"],"Observation":["subject","performer"],"Patient":["link.other"],"Person":["link.target"],"Procedure":["subject","performer.actor"],"Provenance":["target"],"QuestionnaireResponse":["subject","author"],"RelatedPerson":["patient"],"RequestGroup":["subject","action.participant"],"ResearchSubject":["individual"],"RiskAssessment":["subject"],"Schedule":["actor"],"ServiceRequest":["subject","performer"],"Specimen":["subject"],"SupplyDelivery":["patient"],"SupplyRequest":["deliverTo"],"VisionPrescription":["patient"]}
15 changes: 15 additions & 0 deletions bulkExport/schema/transitiveReferenceParams.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"CareTeam": {
"participant.member": "Practitioner",
"managingOrganization": "Organization"
},
"Patient": {
"managingOrganization": "Organization"
},
"DiagnosticReport": {
"performer": "Organization"
},
"Encounter": {
"location.location": "Location"
}
}
4 changes: 4 additions & 0 deletions bulkExport/uploadGlueScriptsToS3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ exports.handler = async (event: any) => {
filename: 'patientCompartmentSearchParams.4.0.1.json',
path: 'bulkExport/schema/patientCompartmentSearchParams.4.0.1.json',
},
{
filename: 'transitiveReferenceParams.json',
path: 'bulkExport/schema/transitiveReferenceParams.json',
},
];

if (event.RequestType === 'Create' || event.RequestType === 'Update') {
Expand Down
12 changes: 3 additions & 9 deletions integration-tests/bulkExport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import BulkExportTestHelper, { ExportStatusOutput } from './bulkExportTestHelper
import { getFhirClient } from './utils';
import createGroupMembersBundle from './createGroupMembersBundle.json';

const FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
jest.setTimeout(FIVE_MINUTES_IN_MS);
const EIGHT_MINUTES_IN_MS = 8 * 60 * 1000;
jest.setTimeout(EIGHT_MINUTES_IN_MS);

const sleep = async (milliseconds: number) => {
return new Promise(resolve => setTimeout(resolve, milliseconds));
Expand Down Expand Up @@ -82,18 +82,12 @@ describe('Bulk Export', () => {
);

// OPERATE
const groupMembersAndPatientCompartment = Object.fromEntries(
Object.entries(resTypToResExpectedInExport).filter(([key]) => key !== 'Group'),
);
const groupId = resTypToResExpectedInExport.Group.id;
const statusPollUrl = await bulkExportTestHelper.startExportJob({ exportType: 'group', groupId });
const responseBody = await bulkExportTestHelper.getExportStatus(statusPollUrl);

// CHECK
return bulkExportTestHelper.checkResourceInExportedFiles(
responseBody.output,
groupMembersAndPatientCompartment,
);
return bulkExportTestHelper.checkResourceInExportedFiles(responseBody.output, resTypToResExpectedInExport);
});

test('Successfully export group members last updated after _since timestamp in a group last updated before the _since timestamp', async () => {
Expand Down
23 changes: 14 additions & 9 deletions integration-tests/bulkExportTestHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ export interface GroupMemberMeta {
inactive?: boolean;
}

export interface GroupMember {
entity: {
reference: string;
};
}

export default class BulkExportTestHelper {
FIVE_MINUTES_IN_MS = 5 * 60 * 1000;
EIGHT_MINUTES_IN_MS = 8 * 60 * 1000;

fhirUserAxios: AxiosInstance;

Expand Down Expand Up @@ -80,8 +86,8 @@ export default class BulkExportTestHelper {
}

async getExportStatus(statusPollUrl: string, expectedSubstring = ''): Promise<any> {
const fiveMinuteFromNow = new Date(new Date().getTime() + this.FIVE_MINUTES_IN_MS);
while (new Date().getTime() < fiveMinuteFromNow.getTime()) {
const cutOffTime = new Date(new Date().getTime() + this.EIGHT_MINUTES_IN_MS);
while (new Date().getTime() < cutOffTime.getTime()) {
try {
console.log('Checking export status');
// eslint-disable-next-line no-await-in-loop
Expand All @@ -99,7 +105,7 @@ export default class BulkExportTestHelper {
}
}
throw new Error(
`Expected export status did not occur during polling time frame of ${this.FIVE_MINUTES_IN_MS /
`Expected export status did not occur during polling time frame of ${this.EIGHT_MINUTES_IN_MS /
1000} seconds`,
);
}
Expand All @@ -121,11 +127,10 @@ export default class BulkExportTestHelper {

// Create group members with metadata
const group = createGroupBundle.entry.filter(entry => entry.resource.resourceType === 'Group')[0].resource;
const groupMemberReferences: string[] = createGroupBundle.entry
.filter(entry => ['Patient', 'Practitioner'].includes(entry.resource.resourceType))
.map(entry => entry.fullUrl);
group.member = groupMemberReferences.map(reference => ({
entity: { reference },
// @ts-ignore
const member: GroupMember[] = group.member || [];
group.member = member.map(entityObj => ({
...entityObj,
...groupMemberMeta,
})) as any[];

Expand Down
99 changes: 98 additions & 1 deletion integration-tests/createGroupMembersBundle.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,32 @@
"url": "Patient"
}
},
{
"fullUrl": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff578bc",
"resource":
{
"resourceType": "Medication",
"id": "med0310",
"ingredient": [
{
"itemCodeableConcept": {
"coding": [
{
"system": "http://snomed.info/sct",
"code": "387138002",
"display": "Busulfan (substance)"
}
]
}
}
]
},
"request":
{
"method": "POST",
"url": "Medication"
}
},
{
"fullUrl": "urn:uuid:e0352b49-8798-398c-8f10-2fc0648a268a",
"resource":
Expand Down Expand Up @@ -54,7 +80,14 @@
"resourceType": "Group",
"type": "person",
"actual": true,
"member": ["something"]
"member": [
{
"entity": {"reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff523ad"}
},
{
"entity": {"reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff578bc"}
}
]
},
"request":
{
Expand Down Expand Up @@ -110,6 +143,70 @@
"method": "POST",
"url": "Provenance"
}
},
{
"fullUrl": "urn:uuid:6ad9a6b5-44fb-4eae-9544-a36d5c05c999",
"resource": {
"resourceType": "CareTeam",
"id": "example",
"identifier": [
{
"value": "12345"
}
],
"status": "active",
"name": "Peter James Charlmers Care Plan for Inpatient Encounter",
"subject": {
"reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff523ad",
"display": "Peter James Chalmers"
},
"participant": [
{
"member": {
"reference": "urn:uuid:e0352b49-8798-398c-8f10-2fc0648a268a",
"display": "Julia"
}
}
],
"encounter": {
"reference": "Encounter/example"
},
"managingOrganization": [
{
"reference": "urn:uuid:e92f7839-c81b-4341-93c3-4c6460bd78dc"
}
]
},
"request": {
"method": "POST",
"url": "CareTeam"
}
},
{
"fullUrl": "urn:uuid:e92f7839-c81b-4341-93c3-4c6460bd78dc",
"resource": {
"resourceType": "Organization",
"id": "hl7",
"name": "Health Level Seven International",
"alias": [
"HL7 International"
],
"address": [
{
"line": [
"3300 Washtenaw Avenue, Suite 227"
],
"city": "Ann Arbor",
"state": "MI",
"postalCode": "48104",
"country": "USA"
}
]
},
"request": {
"method": "POST",
"url": "Organization"
}
}
]
}
Loading

0 comments on commit 3c4c57e

Please sign in to comment.