Skip to content

Commit

Permalink
ALS-7810: Add data dictionary table to PFB output (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 authored Nov 20, 2024
1 parent a951387 commit 5365a09
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
public record Concept(String conceptPath, String name, String display, String dataset, String description, Map<String, String> meta) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
@ConditionalOnProperty("dictionary.host")
public class DictionaryService {

public static final ParameterizedTypeReference<List<Concept>> CONCEPT_LIST_TYPE_REFERENCE = new ParameterizedTypeReference<>() {
};
private final String dictionaryHost;
private final RestTemplate restTemplate;

@Autowired
public DictionaryService(@Value("${dictionary.host}") String dictionaryHostTemplate, @Value("${TARGET_STACK:}") String targetStack) {
if (targetStack != null && !targetStack.isEmpty()) {
this.dictionaryHost = dictionaryHostTemplate.replace("___TARGET_STACK___", targetStack);
} else {
this.dictionaryHost = dictionaryHostTemplate;
}
this.restTemplate = new RestTemplate();
}

public List<Concept> getConcepts(List<String> conceptPaths) {
return restTemplate.exchange(dictionaryHost + "/pic-sure-api-2/PICSURE/proxy/dictionary-api/concepts/detail", HttpMethod.POST, new HttpEntity<>(conceptPaths), CONCEPT_LIST_TYPE_REFERENCE).getBody();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.CodecFactory;
Expand All @@ -16,34 +20,43 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PfbWriter implements ResultWriter {

public static final String PATIENT_TABLE_PREFIX = "pic-sure-";
public static final String PATIENT_TABLE_PREFIX = "pic-sure-patients-";
public static final String DATA_DICTIONARY_TABLE_PREFIX = "pic-sure-data-dictionary-";
private Logger log = LoggerFactory.getLogger(PfbWriter.class);

private final DictionaryService dictionaryService;

private final Schema metadataSchema;
private final Schema nodeSchema;

private final String queryId;

private final String patientTableName;
private final String dataDictionaryTableName;
private SchemaBuilder.FieldAssembler<Schema> entityFieldAssembler;

private List<String> fields;
private List<String> originalFields;
private List<String> formattedFields;
private DataFileWriter<GenericRecord> dataFileWriter;
private File file;
private Schema entitySchema;
private Schema patientDataSchema;
private Schema dataDictionarySchema;
private Schema relationSchema;

private static final Set<String> SINGULAR_FIELDS = Set.of("patient_id");

public PfbWriter(File tempFile, String queryId) {
public PfbWriter(File tempFile, String queryId, DictionaryService dictionaryService) {
this.file = tempFile;
this.queryId = queryId;
this.dictionaryService = dictionaryService;
this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId);
this.dataDictionaryTableName = formatFieldName(DATA_DICTIONARY_TABLE_PREFIX + queryId);
entityFieldAssembler = SchemaBuilder.record("entity")
.namespace("edu.harvard.dbmi")
.fields();
Expand Down Expand Up @@ -71,11 +84,21 @@ public PfbWriter(File tempFile, String queryId) {

@Override
public void writeHeader(String[] data) {
fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList());
originalFields = List.of(data);
formattedFields = originalFields.stream().map(this::formatFieldName).collect(Collectors.toList());

dataDictionarySchema = SchemaBuilder.record(dataDictionaryTableName)
.fields()
.requiredString("concept_path")
.name("drs_uri").type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault()
.nullableString("display", "null")
.nullableString("dataset", "null")
.nullableString("description", "null")
.endRecord();

SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record(patientTableName)
.fields();

fields.forEach(field -> {
formattedFields.forEach(field -> {
if (isSingularField(field)) {
patientRecords.nullableString(field, "null");
} else {
Expand All @@ -85,7 +108,7 @@ public void writeHeader(String[] data) {
});
patientDataSchema = patientRecords.endRecord();

Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema);
Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, dataDictionarySchema);

entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault();
entityFieldAssembler.nullableString("id", "null");
Expand All @@ -104,6 +127,60 @@ public void writeHeader(String[] data) {
}

writeMetadata();
writeDataDictionary();
}

private void writeDataDictionary() {
GenericRecord entityRecord = new GenericData.Record(entitySchema);;
Map<String, Concept> conceptMap = Map.of();
try {
conceptMap = dictionaryService.getConcepts(originalFields).stream()
.collect(Collectors.toMap(Concept::conceptPath, Function.identity()));
} catch (RuntimeException e) {
log.error("Error fetching concepts from dictionary service", e);
}

for (int i = 0; i < formattedFields.size(); i++) {
String formattedField = formattedFields.get(i);
if ("patient_id".equals(formattedField)) {
continue;
}
GenericRecord dataDictionaryData = new GenericData.Record(dataDictionarySchema);
dataDictionaryData.put("concept_path", formattedField);

Concept concept = conceptMap.get(originalFields.get(i));
List<String> drsUris = List.of();
if (concept != null) {
Map<String, String> meta = concept.meta();
if (meta != null) {
String drsUriJson = meta.get("drs_uri");
if (drsUriJson != null) {
try {
String[] drsUriArray = new ObjectMapper().readValue(drsUriJson, String[].class);
drsUris = List.of(drsUriArray);
} catch (JsonProcessingException e) {
log.error("Error parsing drs_uri as json: " + drsUriJson);
}
}
}
dataDictionaryData.put("display", concept.display());
dataDictionaryData.put("dataset", concept.dataset());
dataDictionaryData.put("description", concept.description());
}
dataDictionaryData.put("drs_uri", drsUris);

log.info("Writing " + formattedField + " to data dictonary table with drs_uris: " + drsUris);
entityRecord.put("object", dataDictionaryData);
entityRecord.put("name", dataDictionaryTableName);
entityRecord.put("id", formattedField);
entityRecord.put("relations", List.of());

try {
dataFileWriter.append(entityRecord);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private boolean isSingularField(String field) {
Expand All @@ -126,7 +203,7 @@ private void writeMetadata() {
GenericRecord entityRecord = new GenericData.Record(entitySchema);

List<GenericRecord> nodeList = new ArrayList<>();
for (String field : fields) {
for (String field : formattedFields) {
GenericRecord nodeData = new GenericData.Record(nodeSchema);
nodeData.put("name", field);
nodeData.put("ontology_reference", "");
Expand Down Expand Up @@ -158,21 +235,21 @@ public void writeEntity(Collection<String[]> entities) {
@Override
public void writeMultiValueEntity(Collection<List<List<String>>> entities) {
entities.forEach(entity -> {
if (entity.size() != fields.size()) {
if (entity.size() != formattedFields.size()) {
throw new IllegalArgumentException("Entity length much match the number of fields in this document");
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
String patientId = "";
for(int i = 0; i < fields.size(); i++) {
if ("patient_id".equals(fields.get(i))) {
for(int i = 0; i < formattedFields.size(); i++) {
if ("patient_id".equals(formattedFields.get(i))) {
patientId = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : "";
}
if (isSingularField(fields.get(i))) {
if (isSingularField(formattedFields.get(i))) {
String entityValue = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : "";
patientData.put(fields.get(i), entityValue);
patientData.put(formattedFields.get(i), entityValue);
} else {
List<String> fieldValue = entity.get(i) != null ? entity.get(i) : List.of();
patientData.put(fields.get(i), fieldValue);
patientData.put(formattedFields.get(i), fieldValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ public SignUrlService(

public void uploadFile(File file, String objectKey) {
S3Client s3 = S3Client.builder()
.region(region)
.region(this.region)
.build();

putS3Object(s3, bucketName, objectKey, file);
s3.close();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

public class ConceptTest {

@Test
public void jsonSerialization() throws JsonProcessingException {
Concept[] concepts = new Concept[]{new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]"))};
ObjectMapper objectMapper = new ObjectMapper();

String serialized = objectMapper.writeValueAsString(concepts);
Concept[] deserialized = objectMapper.readValue(serialized, Concept[].class);

assertEquals(List.of(concepts), List.of(deserialized));
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.*;


@ExtendWith(MockitoExtension.class)
public class PfbWriterTest {

@Mock
private DictionaryService dictionaryService;

@Test
public void writeValidPFB() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);

Mockito.when(dictionaryService.getConcepts(List.of("patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\")))
.thenReturn(List.of(new Concept("\\demographics\\age\\", "age", "AGE", null, "patient age", Map.of("drs_uri", "[\"a-drs.uri\", \"another-drs.uri\"]"))));

pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"});
List<List<String>> nullableList = new ArrayList<>();
Expand All @@ -34,26 +46,25 @@ public void writeValidPFB() {
List.of(List.of(), List.of("75"), List.of())
));
pfbWriter.close();
// todo: validate this programatically
}

@Test
public void formatFieldName_spacesAndBackslashes_replacedWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("\\Topmed Study Accession with Subject ID\\\\");
assertEquals("_Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_startsWithDigit_prependUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("123Topmed Study Accession with Subject ID\\\\");
assertEquals("_123Topmed_Study_Accession_with_Subject_ID__", formattedName);
}

@Test
public void formatFieldName_randomGarbage_replaceWithUnderscore() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString());
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);
String formattedName = pfbWriter.formatFieldName("$$$my garbage @vro var!able nam#");
assertEquals("___my_garbage__vro_var_able_nam_", formattedName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.stream.Collectors;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.PfbWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class QueryService {
private final CountProcessor countProcessor;
private final MultiValueQueryProcessor multiValueQueryProcessor;

private final DictionaryService dictionaryService;

HashMap<String, AsyncResult> results = new HashMap<>();


Expand All @@ -57,6 +60,7 @@ public QueryService (AbstractProcessor abstractProcessor,
TimeseriesProcessor timeseriesProcessor,
CountProcessor countProcessor,
MultiValueQueryProcessor multiValueQueryProcessor,
@Autowired(required = false) DictionaryService dictionaryService,
@Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit,
@Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads,
@Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) {
Expand All @@ -65,6 +69,7 @@ public QueryService (AbstractProcessor abstractProcessor,
this.timeseriesProcessor = timeseriesProcessor;
this.countProcessor = countProcessor;
this.multiValueQueryProcessor = multiValueQueryProcessor;
this.dictionaryService = dictionaryService;

SMALL_JOB_LIMIT = smallJobLimit;
SMALL_TASK_THREADS = smallTaskThreads;
Expand Down Expand Up @@ -136,7 +141,7 @@ private AsyncResult initializeResult(Query query) throws IOException {
String queryId = UUIDv5.UUIDFromString(query.toString()).toString();
ResultWriter writer;
if (ResultType.DATAFRAME_PFB.equals(query.getExpectedResultType())) {
writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId);
writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro"), queryId, dictionaryService);
} else {
writer = new CsvWriter(File.createTempFile("result-" + System.nanoTime(), ".sstmp"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ HPDS_GENOMIC_DATA_DIRECTORY=/opt/local/hpds/all/

data-export.s3.bucket-name=pic-sure-auth-dev-data-export
data-export.s3.region=us-east-1
data-export.s3.signedUrl-expiry-minutes=60
data-export.s3.signedUrl-expiry-minutes=60

dictionary.host = http://wildfly.___TARGET_STACK___:8080/
Loading

0 comments on commit 5365a09

Please sign in to comment.