Skip to content

Commit

Permalink
Merge pull request #312 from helxplatform/develop
Browse files Browse the repository at this point in the history
Release candidate
  • Loading branch information
YaphetKG authored Aug 21, 2023
2 parents 3b3c66f + 8e8de2d commit 626de70
Show file tree
Hide file tree
Showing 16 changed files with 531 additions and 122 deletions.
Binary file modified data/bdc_dbgap_data_dicts.tar.gz
Binary file not shown.
Binary file removed data/redis/appendonly.aof
Binary file not shown.
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
elasticsearch[async]==7.16.3
fastapi==0.95.0
uvicorn
uvicorn==0.23.2
gunicorn
itsdangerous
Jinja2
Expand All @@ -19,5 +19,6 @@ six==1.16.0

# Click for command line arguments
# We use Click 7.0 because that's what one of the pinned packages above use.
click~=7.0
click
httpx>=0.24.1
bmt==1.1.0
8 changes: 5 additions & 3 deletions src/dug/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class Config:

# Normalizer config that will be passed to annotate.Normalizer constructor
normalizer: dict = field(default_factory=lambda: {
"url": "https://nodenormalization-sri.renci.org/get_normalized_nodes?conflate=false&curie="
"url": "https://nodenormalization-dev.apps.renci.org/get_normalized_nodes?conflate=false&description=true&curie="
})

# Synonym service config that will be passed to annotate.SynonymHelper constructor
synonym_service: dict = field(default_factory=lambda: {
"url": "https://onto.renci.org/synonyms/"
"url": "https://name-resolution-sri.renci.org/reverse_lookup"
})

# Ontology metadata helper config that will be passed to annotate.OntologyHelper constructor
Expand All @@ -59,7 +59,9 @@ class Config:
"disease": ["disease", "phenotypic_feature"],
"pheno": ["phenotypic_feature", "disease"],
"anat": ["disease", "anatomical_entity"],
"chem_to_disease": ["chemical_substance", "disease"],
"chem_to_disease": ["chemical_entity", "disease"],
"small_molecule_to_disease": ["small_molecule", "disease"],
"chemical_mixture_to_disease": ["chemical_mixture", "disease"],
"phen_to_anat": ["phenotypic_feature", "anatomical_entity"],
})

Expand Down
73 changes: 26 additions & 47 deletions src/dug/core/annotate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
import logging
import os
import re
import urllib.parse
from typing import TypeVar, Generic, Union, List, Tuple, Optional
import bmt
import requests
from requests import Session

Expand Down Expand Up @@ -59,14 +61,12 @@ def __init__(
annotator: "Annotator",
normalizer: "Normalizer",
synonym_finder: "SynonymFinder",
ontology_helper: "OntologyHelper",
ontology_greenlist=[],
):
self.preprocessor = preprocessor
self.annotator = annotator
self.normalizer = normalizer
self.synonym_finder = synonym_finder
self.ontology_helper = ontology_helper
self.ontology_greenlist = ontology_greenlist
self.norm_fails_file = "norm_fails.txt"
self.anno_fails_file = "anno_fails.txt"
Expand Down Expand Up @@ -106,12 +106,6 @@ def annotate(self, text, http_session):
# Add synonyms to identifier
norm_id.synonyms = self.synonym_finder.get_synonyms(norm_id.id, http_session)

# Get canonical label, name, and description from ontology metadata service
name, desc, ontology_type = self.ontology_helper.get_ontology_info(norm_id.id, http_session)
norm_id.label = name
norm_id.description = desc
norm_id.type = ontology_type

# Get pURL for ontology identifer for more info
norm_id.purl = BioLinkPURLerizer.get_curie_purl(norm_id.id)
processed_identifiers.append(norm_id)
Expand Down Expand Up @@ -335,6 +329,7 @@ def handle_response(self, value, response: dict) -> List[Identifier]:

class Normalizer(ApiClient[Identifier, Identifier]):
def __init__(self, url):
self.bl_toolkit = bmt.Toolkit()
self.url = url

def normalize(self, identifier: Identifier, http_session: Session):
Expand Down Expand Up @@ -380,9 +375,13 @@ def handle_response(self, identifier: Identifier, normalized: dict) -> Optional[
logger.debug(f"Preferred id: {preferred_id}")
identifier.id = preferred_id.get('identifier', '')
identifier.label = preferred_id.get('label', '')
identifier.equivalent_identifiers = [v['identifier'] for v in equivalent_identifiers]
identifier.types = biolink_type

identifier.description = preferred_id.get('description', '')
identifier.equivalent_identifiers = [v['identifier'] for v in equivalent_identifiers]
try:
identifier.types = self.bl_toolkit.get_element(biolink_type[0]).name
except:
# converts biolink:SmallMolecule to small molecule
identifier.types = (" ".join(re.split("(?=[A-Z])", biolink_type[0].replace('biolink:', ''))[1:])).lower()
return identifier


Expand All @@ -400,51 +399,31 @@ def get_synonyms(self, curie: str, http_session):
return self(curie, http_session)

def make_request(self, curie: str, http_session: Session):

# Get response from synonym service
url = f"{self.url}{urllib.parse.quote(curie)}"

# Get response from namelookup reverse lookup op
# example (https://name-resolution-sri.renci.org/docs#/lookup/lookup_names_reverse_lookup_post)
url = f"{self.url}"
payload = {
'curies': [curie]
}
try:
response = http_session.get(url)
if response.status_code == 400:
logger.error(f"No synonyms returned for: `{curie}`. Validation error.")
return []
if response.status_code == 500:
logger.error(f"No synonyms returned for: `{curie}`. Internal server error from {self.url}.")
return []
response = http_session.post(url, json=payload)
if str(response.status_code).startswith('4'):
logger.error(f"No synonyms returned for: `{curie}`. Validation error: {response.text}")
return {curie: []}
if str(response.status_code).startswith('5'):
logger.error(f"No synonyms returned for: `{curie}`. Internal server error from {self.url}. Error: {response.text}")
return {curie: []}
return response.json()
except json.decoder.JSONDecodeError as e:
logger.error(f"Json parse error for response from `{url}`. Exception: {str(e)}")
return []
return {curie: []}

def handle_response(self, curie: str, raw_synonyms: List[dict]) -> List[str]:
# List comprehension unpack all synonyms into a list
return [synonym['desc'] for synonym in raw_synonyms]
# Return curie synonyms
return raw_synonyms.get(curie, [])


class OntologyHelper(ApiClient[str, Tuple[str, str, str]]):
def __init__(self, url):
self.url = url

def make_request(self, curie: str, http_session: Session):
url = f"{self.url}{urllib.parse.quote(curie)}"
try:
response = http_session.get(url).json()
return response
except json.decoder.JSONDecodeError as e:
logger.error(f"No labels returned for: {curie}")
return {}

def handle_response(self, curie: str, response: dict) -> Tuple[str,str,str]:
# List comprehension for synonyms
name = response.get('label', '')
description = '' if not response.get('description', None) else response.get('description', '')
ontology_type = '' if not response.get('category', None) else response.get('category', '')[0]

return name, description, ontology_type

def get_ontology_info(self, curie, http_session):
return self(curie, http_session)


class BioLinkPURLerizer:
Expand Down
19 changes: 13 additions & 6 deletions src/dug/core/async_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,10 @@ async def search_concepts(self, query, offset=0, size=None, types=None,
Changed to a long boolean match query to optimize search results
"""
query_dict = self._build_concepts_query(query, **kwargs)
total_items = await self.es.count(
body={"query": query_dict},
index="concepts_index")
# Get aggregated counts of biolink types
search_body = {"query": query_dict}
search_body['aggs'] = {'type-count': {'terms': {'field': 'type'}}}
# Add post_filter on types
if types:
assert isinstance(types, list)
if isinstance(types, list):
search_body['post_filter'] = {
"bool": {
"should": [
Expand All @@ -239,6 +234,18 @@ async def search_concepts(self, query, offset=0, size=None, types=None,
size=size,
explain=True
)
# Aggs/post_filter aren't supported by count
del search_body["aggs"]
if "post_filter" in search_body:
# We'll move the post_filter into the actual filter
search_body["query"]["bool"]["filter"]["bool"].update(
search_body["post_filter"]["bool"]
)
del search_body["post_filter"]
total_items = await self.es.count(
body=search_body,
index="concepts_index"
)

# Simplify the data structure we get from aggregations to put into the
# return value. This should be a count of documents hit for every type
Expand Down
12 changes: 7 additions & 5 deletions src/dug/core/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
from requests_cache import CachedSession

import dug.core.tranql as tql
from dug.core.annotate import DugAnnotator, Annotator, Normalizer, OntologyHelper, Preprocessor, SynonymFinder, \
ConceptExpander
from dug.core.annotate import (DugAnnotator,
Annotator,
Normalizer,
Preprocessor,
SynonymFinder,
ConceptExpander)
from dug.config import Config as DugConfig, TRANQL_SOURCE
from dug.core.crawler import Crawler
from dug.core.parsers import Parser
Expand Down Expand Up @@ -53,14 +57,12 @@ def build_annotator(self) -> DugAnnotator:
annotator = Annotator(**self.config.annotator)
normalizer = Normalizer(**self.config.normalizer)
synonym_finder = SynonymFinder(**self.config.synonym_service)
ontology_helper = OntologyHelper(**self.config.ontology_helper)

annotator = DugAnnotator(
preprocessor=preprocessor,
annotator=annotator,
normalizer=normalizer,
synonym_finder=synonym_finder,
ontology_helper=ontology_helper
synonym_finder=synonym_finder
)

return annotator
Expand Down
16 changes: 13 additions & 3 deletions src/dug/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, cfg: Config, indices=None):

self.es = Elasticsearch(hosts=self.hosts,
http_auth=(self._cfg.elastic_username, self._cfg.elastic_password))
self.replicas = self.get_es_node_count()

if self.es.ping():
logger.info('connected to elasticsearch')
Expand All @@ -36,6 +37,10 @@ def __init__(self, cfg: Config, indices=None):
raise SearchException(
message='failed to connect to elasticsearch',
details=f"connecting to host {self._cfg.elastic_host} and port {self._cfg.elastic_port}")

def get_es_node_count(self):
return self.es.nodes.info()["_nodes"]["total"]


def init_indices(self):
# The concepts and variable indices include an analyzer that utilizes the english
Expand All @@ -49,7 +54,7 @@ def init_indices(self):
kg_index = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
"number_of_replicas": self.replicas
},
"mappings": {
"properties": {
Expand All @@ -66,7 +71,7 @@ def init_indices(self):
"settings": {
"index.mapping.coerce": "false",
"number_of_shards": 1,
"number_of_replicas": 0,
"number_of_replicas": self.replicas,
"analysis": {
"analyzer": {
"std_with_stopwords": {
Expand Down Expand Up @@ -104,7 +109,7 @@ def init_indices(self):
"settings": {
"index.mapping.coerce": "false",
"number_of_shards": 1,
"number_of_replicas": 0,
"number_of_replicas": self.replicas,
"analysis": {
"analyzer": {
"std_with_stopwords": {
Expand Down Expand Up @@ -148,6 +153,11 @@ def init_indices(self):
for index in self.indices:
try:
if self.es.indices.exists(index=index):
# if index exists check if replication is good
index_replicas = self.es.indices.get_settings(index=index)[index]["settings"]["index"]["number_of_replicas"]
if index_replicas != self.replicas:
self.es.indices.put_settings(index=index, body={"number_of_replicas": (self.replicas - 1) or 1 })
self.es.indices.refresh(index=index)
logger.info(f"Ignoring index {index} which already exists.")
else:
result = self.es.indices.create(
Expand Down
33 changes: 28 additions & 5 deletions src/dug/core/parsers/dbgap_parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import re
import re, os
from typing import List
from xml.etree import ElementTree as ET

from dug import utils as utils
from pathlib import Path
from ._base import DugElement, FileParser, Indexable, InputFile

logger = logging.getLogger('dug')
Expand All @@ -13,27 +14,49 @@ class DbGaPParser(FileParser):
# Class for parsers DBGaP Data dictionary into a set of Dug Elements

@staticmethod
def parse_study_name_from_filename(filename: str):
def parse_study_name_from_filename(filename: str) -> str:
# Parse the study name from the xml filename if it exists. Return None if filename isn't right format to get id from
dbgap_file_pattern = re.compile(r'.*/*phs[0-9]+\.v[0-9]+\.pht[0-9]+\.v[0-9]+\.(.+)\.data_dict.*')
match = re.match(dbgap_file_pattern, filename)
if match is not None:
return match.group(1)
return None

@staticmethod
def parse_study_name_from_gap_exchange_file(filepath: Path) -> str:
# Parse the study name from the GapExchange file adjacent to the file passed in
parent_dir = filepath.parent.absolute()
gap_exchange_filename_str = "GapExchange_" + parent_dir.name
gap_exchange_filepath = None
for item in os.scandir(parent_dir):
if item.is_file and gap_exchange_filename_str in item.name:
gap_exchange_filepath = item.path
if gap_exchange_filepath is None:
return None
tree = ET.parse(gap_exchange_filepath, ET.XMLParser(encoding='iso-8859-5'))
tree_root = tree.getroot()
return tree_root.find("./Studies/Study/Configuration/StudyNameEntrez").text


def _get_element_type(self):
return "DbGaP"

def __call__(self, input_file: InputFile) -> List[Indexable]:
logger.debug(input_file)
if "GapExchange" in str(input_file).split("/")[-1]:
msg = f"Skipping parsing for GapExchange file: {input_file}!"
logger.info(msg)
return []
tree = ET.parse(input_file, ET.XMLParser(encoding='iso-8859-5'))
root = tree.getroot()
study_id = root.attrib['study_id']
participant_set = root.get('participant_set','0')

# Parse study name from file handle
study_name = self.parse_study_name_from_filename(str(input_file))

# Parse study name from GapExchange file, and if that fails try from file handle
# If still None, raise an error message
study_name = self.parse_study_name_from_gap_exchange_file(Path(input_file))
if study_name is None:
study_name = self.parse_study_name_from_filename(str(input_file))
if study_name is None:
err_msg = f"Unable to parse DbGaP study name from data dictionary: {input_file}!"
logger.error(err_msg)
Expand Down
Loading

0 comments on commit 626de70

Please sign in to comment.