diff --git a/.env b/.env index 925049e8..d2b79878 100644 --- a/.env +++ b/.env @@ -1,12 +1,5 @@ DATA_DIR=./local_storage -NEO4J_PASSWORD=15707 -NEO4J_HOST=neo4j -NEO4J_CPU_LIMIT=2 -NEO4J_CPU_RESERVATION=1 -NEO4J_MEM_LIMIT=3G -NEO4J_MEM_RESERVATION=2G - ELASTIC_PASSWORD=15707 ELASTIC_API_HOST=elasticsearch ELASTIC_USERNAME=elastic diff --git a/.env.template b/.env.template index afab228d..81ad446f 100644 --- a/.env.template +++ b/.env.template @@ -1,12 +1,5 @@ DATA_DIR=$DATA_DIR -NEO4J_PASSWORD=$RANDOM -NEO4J_HOST=neo4j -NEO4J_CPU_LIMIT=2 -NEO4J_CPU_RESERVATION=1 -NEO4J_MEM_LIMIT=3G -NEO4J_MEM_RESERVATION=2G - ELASTIC_PASSWORD=$RANDOM ELASTIC_API_HOST=elasticsearch ELASTIC_USERNAME=elastic diff --git a/.gitignore b/.gitignore index 259314ab..e347dc05 100644 --- a/.gitignore +++ b/.gitignore @@ -156,5 +156,4 @@ variable_file.json monarch_results.txt anno_fails.txt data/elastic/ -data/neo4j/ crawl/ diff --git a/Jenkinsfile b/Jenkinsfile index 025e0c03..4ded3865 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -17,12 +17,12 @@ pipeline { } stage('Publish') { when { - branch 'develop' + tag "release-*" } steps { sh ''' - make build.image - make publish.image + make build + make publish ''' } } diff --git a/README.md b/README.md index 0821f994..4647f686 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,6 @@ services from outside the container (but in a shell env), run: ```shell source .env export $(cut -d= -f1 .env) -export NEO4J_HOST=localhost export ELASTIC_API_HOST=localhost export REDIS_HOST=localhost ``` diff --git a/data/heal_data_dicts.tar.gz b/data/heal_data_dicts.tar.gz new file mode 100644 index 00000000..10fc2a7d Binary files /dev/null and b/data/heal_data_dicts.tar.gz differ diff --git a/docker-compose.yaml b/docker-compose.yaml index 86fb124c..d544cd5d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,7 +11,7 @@ version: '3.0' ## NOTE: To connect to a dug service running in docker machine, from your local ## development machine, you will need to follow the steps in the Quickstart ## section of the README.md and set/export the env vars with special attention -## paid to the env vars: NEO4J_HOST, ELASTIC_API_HOST, and REDIS_HOST. +## paid to the env vars: ELASTIC_API_HOST, and REDIS_HOST. ## ################################################################################# services: @@ -28,7 +28,6 @@ services: context: . depends_on: - elasticsearch - - neo4j - redis - nboost restart: always @@ -68,25 +67,6 @@ services: - '9200:9200' - '9300:9300' - ################################################################################# - ## - ## A graph database provides query over linked data to drive indexing. - ## - ################################################################################# - neo4j: - image: bitnami/neo4j:3.5.14 - networks: - - dug-network - environment: - - NEO4J_PASSWORD=$NEO4J_PASSWORD - - NEO4J_HOST=$HOSTNAME - volumes: - - $DATA_DIR/neo4j:/bitnami - ports: - - '7474:7474' - - '7473:7473' - - '7687:7687' - ################################################################################# ## ## A memory cache for results of high volume service requests. diff --git a/requirements.txt b/requirements.txt index afd690db..1649f797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ elasticsearch==7.12.0 flake8==3.9.0 flasgger==0.9.4 Flask==1.1.1 -Flask-Cors==3.0.8 +Flask-Cors==3.0.9 Flask-RESTful==0.3.8 gunicorn==20.0.4 idna==2.8 @@ -17,8 +17,9 @@ Jinja2==2.11.3 jsonschema==3.2.0 MarkupSafe==1.1.1 mistune==0.8.4 +pluggy==0.13.1 pyrsistent==0.17.3 -pytest==5.4.0 +pytest==6.2.2 pytz==2021.1 PyYAML==5.4.1 redis==3.4.1 diff --git a/setup.cfg b/setup.cfg index 4ae56e5f..e0048f69 100644 --- a/setup.cfg +++ b/setup.cfg @@ -22,6 +22,7 @@ python_requires = >=3.7 include_package_data = true install_requires = elasticsearch>=7.0.0,<8.0.0 + pluggy requests requests_cache redis>=3.0.0 diff --git a/src/dug/__init__.py b/src/dug/__init__.py index 8dee4bf8..7df9f7aa 100644 --- a/src/dug/__init__.py +++ b/src/dug/__init__.py @@ -1 +1 @@ -from ._version import __version__ +from ._version import __version__ \ No newline at end of file diff --git a/src/dug/_version.py b/src/dug/_version.py index 159d48b8..dac77789 100644 --- a/src/dug/_version.py +++ b/src/dug/_version.py @@ -1 +1 @@ -__version__ = "2.0.1" +__version__ = "2.1.0" \ No newline at end of file diff --git a/src/dug/api.py b/src/dug/api.py index 05c61f1b..8cd4c7a9 100644 --- a/src/dug/api.py +++ b/src/dug/api.py @@ -1,16 +1,19 @@ import argparse import json -import jsonschema import logging import os import sys import traceback + +import jsonschema import yaml from flasgger import Swagger from flask import Flask, g, Response, request -from flask_restful import Api, Resource from flask_cors import CORS -from dug.core import Search +from flask_restful import Api, Resource + +from dug.config import Config +from dug.core.search import Search """ Defines the semantic search API @@ -44,14 +47,14 @@ def dug (): if not hasattr(g, 'dug'): - g.search = Search () + g.search = Search(Config.from_env()) return g.search - + class DugResource(Resource): """ Base class handler for Dug API requests. """ def __init__(self): self.specs = {} - + """ Functionality common to Dug services. """ def validate (self, request, component): return @@ -62,7 +65,7 @@ def validate (self, request, component): to_validate = self.specs["components"]["schemas"][component] try: app.logger.debug (f"--:Validating obj {json.dumps(request.json, indent=2)}") - app.logger.debug (f" schema: {json.dumps(to_validate, indent=2)}") + app.logger.debug (f" schema: {json.dumps(to_validate, indent=2)}") jsonschema.validate(request.json, to_validate) except jsonschema.exceptions.ValidationError as error: app.logger.error (f"ERROR: {str(error)}") diff --git a/src/dug/cli.py b/src/dug/cli.py index c01d7d69..5839d41f 100755 --- a/src/dug/cli.py +++ b/src/dug/cli.py @@ -6,7 +6,8 @@ import argparse import os -from dug.core import Dug, logger +from dug.config import Config +from dug.core import Dug, logger, DugFactory class KwargParser(argparse.Action): @@ -92,21 +93,30 @@ def get_argparser(): def crawl(args): - dug = Dug() + config = Config.from_env() + factory = DugFactory(config) + dug = Dug(factory) dug.crawl(args.target, args.parser_type, args.element_type) def search(args): - dug = Dug() + config = Config.from_env() + factory = DugFactory(config) + dug = Dug(factory) + # dug = Dug() response = dug.search(args.target, args.query, **args.kwargs) print(response) def datatypes(args): - dug = Dug() + config = Config.from_env() + factory = DugFactory(config) + dug = Dug(factory) + # dug = Dug() response = dug.info(args.target, **args.kwargs) + def status(args): print("Status check is not implemented yet!") diff --git a/src/dug/config.py b/src/dug/config.py index 65f63022..79c809ec 100644 --- a/src/dug/config.py +++ b/src/dug/config.py @@ -1,62 +1,96 @@ import os -import dug.tranql as tql - -# Redis cache config -redis_host = os.environ.get('REDIS_HOST', 'localhost') -redis_port = os.environ.get('REDIS_PORT', 6379) -redis_password = os.environ.get('REDIS_PASSWORD', '') - -# ElasticSearch config options -elasticsearch_host = os.environ.get('ELASTIC_API_HOST', 'localhost') -elasticsearch_port = os.environ.get('ELASTIC_API_PORT', 9200) - -# Preprocessor config that will be passed to annotate.Preprocessor constructor -preprocessor = { - "debreviator": { - "BMI": "body mass index" - }, - "stopwords": ["the"] -} - -# Annotator config that will be passed to annotate.Annotator constructor -annotator = { - 'url': "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" -} - -# Normalizer config that will be passed to annotate.Normalizer constructor -normalizer = { - 'url': "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" -} - -# Synonym service config that will be passed to annotate.SynonymHelper constructor -synonym_service = { - 'url': "https://onto.renci.org/synonyms/" -} - -# Ontology metadata helper config that will be passed to annotate.OntologyHelper constructor -ontology_helper = { - 'url': "https://api.monarchinitiative.org/api/bioentity/" -} - -# Redlist of identifiers not to expand via TranQL -tranql_exclude_identifiers = ["CHEBI:17336"] - -# TranQL queries used to expand identifiers -tranql_source = "/graph/gamma/quick" -tranql_queries = { - "disease": tql.QueryFactory(["disease", "phenotypic_feature"], tranql_source), - "pheno": tql.QueryFactory(["phenotypic_feature", "disease"], tranql_source), - "anat": tql.QueryFactory(["disease", "anatomical_entity"], tranql_source), - "chem_to_disease": tql.QueryFactory(["chemical_substance", "disease"], tranql_source), - "phen_to_anat": tql.QueryFactory(["phenotypic_feature", "anatomical_entity"], tranql_source), - #"anat_to_disease": tql.QueryFactory(["anatomical_entity", "disease"], tranql_source), - #"anat_to_pheno": tql.QueryFactory(["anatomical_entity", "phenotypic_feature"], tranql_source) -} - -concept_expander = { - 'url': "https://tranql.renci.org/tranql/query?dynamic_id_resolution=true&asynchronous=false", - 'min_tranql_score': 0.0 -} - -# List of ontology types that can be used even if they fail normalization -ontology_greenlist = ["PATO", "CHEBI", "MONDO", "UBERON", "HP", "MESH", "UMLS"] + +from dataclasses import dataclass, field + + +TRANQL_SOURCE: str = "/graph/gamma/quick" + + +@dataclass +class Config: + """ + TODO: Populate description + """ + elastic_password: str = "changeme" + redis_password: str = "changeme" + + elastic_host: str = "elasticsearch" + elastic_port: int = 9200 + elastic_username: str = "elastic" + + redis_host: str = "redis" + redis_port: int = 6379 + + nboost_host: str = "nboost" + nboost_port: int = 8000 + + # Preprocessor config that will be passed to annotate.Preprocessor constructor + preprocessor: dict = field(default_factory=lambda: { + "debreviator": { + "BMI": "body mass index" + }, + "stopwords": ["the"] + }) + + # Annotator config that will be passed to annotate.Annotator constructor + annotator: dict = field(default_factory=lambda: { + "url": "https://api.monarchinitiative.org/api/nlp/annotate/entities?min_length=4&longest_only=false&include_abbreviation=false&include_acronym=false&include_numbers=false&content=" + }) + + # Normalizer config that will be passed to annotate.Normalizer constructor + normalizer: dict = field(default_factory=lambda: { + "url": "https://nodenormalization-sri.renci.org/get_normalized_nodes?curie=" + }) + + # Synonym service config that will be passed to annotate.SynonymHelper constructor + synonym_service: dict = field(default_factory=lambda: { + "url": "https://onto.renci.org/synonyms/" + }) + + # Ontology metadata helper config that will be passed to annotate.OntologyHelper constructor + ontology_helper: dict = field(default_factory=lambda: { + "url": "https://api.monarchinitiative.org/api/bioentity/" + }) + + # Redlist of identifiers not to expand via TranQL + tranql_exclude_identifiers: list = field(default_factory=lambda: ["CHEBI:17336"]) + + tranql_queries: dict = field(default_factory=lambda: { + "disease": ["disease", "phenotypic_feature"], + "pheno": ["phenotypic_feature", "disease"], + "anat": ["disease", "anatomical_entity"], + "chem_to_disease": ["chemical_substance", "disease"], + "phen_to_anat": ["phenotypic_feature", "anatomical_entity"], + }) + + + concept_expander: dict = field(default_factory=lambda: { + "url": "https://tranql.renci.org/tranql/query?dynamic_id_resolution=true&asynchronous=false", + "min_tranql_score": 0.0 + }) + + # List of ontology types that can be used even if they fail normalization + ontology_greenlist: list = field(default_factory=lambda: ["PATO", "CHEBI", "MONDO", "UBERON", "HP", "MESH", "UMLS"]) + + @classmethod + def from_env(cls): + env_vars = { + "elastic_host": "ELASTIC_API_HOST", + "elastic_port": "ELASTIC_API_PORT", + "elastic_username": "ELASTIC_USERNAME", + "elastic_password": "ELASTIC_PASSWORD", + "redis_host": "REDIS_HOST", + "redis_port": "REDIS_PORT", + "redis_password": "REDIS_PASSWORD", + "nboost_host": "NBOOST_API_HOST", + "nboost_port": "NBOOST_API_PORT" + } + + kwargs = {} + + for kwarg, env_var in env_vars.items(): + env_value = os.environ.get(env_var) + if env_value: + kwargs[kwarg] = env_value + + return cls(**kwargs) diff --git a/src/dug/core.py b/src/dug/core.py deleted file mode 100644 index b01092a3..00000000 --- a/src/dug/core.py +++ /dev/null @@ -1,709 +0,0 @@ -import json -import logging -import os -import sys -import traceback -from functools import partial -from pathlib import Path - -import redis -import requests -from elasticsearch import Elasticsearch -from requests_cache import CachedSession - -import dug.annotate as anno -import dug.config as cfg -import dug.parsers as parsers - -logger = logging.getLogger('dug') -stdout_log_handler = logging.StreamHandler(sys.stdout) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -stdout_log_handler.setFormatter(formatter) -logger.addHandler(stdout_log_handler) - -logging.getLogger("elasticsearch").setLevel(logging.WARNING) - - -class SearchException(Exception): - def __init__(self, message, details): - self.message = message - self.details = details - - -class ParserNotFoundException(Exception): - pass - - -class Search: - """ Search - - 1. Lexical fuzziness; (a) misspellings - a function of elastic. - 2. Fuzzy ontologically; - (a) expand based on core queries - * phenotype->study - * phenotype->disease->study - * disease->study - * disease->phenotype->study - """ - - def __init__(self, host=os.environ.get('ELASTIC_API_HOST'), port=9200, - indices=['concepts_index', 'variables_index', 'kg_index']): - logger.debug(f"Connecting to elasticsearch host: {host} at port: {port}") - self.indices = indices - self.host = os.environ.get('ELASTIC_API_HOST', 'localhost') - self.username = os.environ.get('ELASTIC_USERNAME', 'elastic') - self.password = os.environ.get('ELASTIC_PASSWORD', 'changeme') - self.nboost_host = os.environ.get('NBOOST_API_HOST', 'nboost') - self.hosts = [ - { - 'host': self.host, - 'port': port - } - ] - logger.debug(f"Authenticating as user {self.username} to host:{self.hosts}") - self.es = Elasticsearch(hosts=self.hosts, - http_auth=(self.username, self.password)) - - if self.es.ping(): - logger.info('connected to elasticsearch') - self.init_indices() - else: - print(f"Unable to connect to elasticsearch at {host}:{port}") - logger.error(f"Unable to connect to elasticsearch at {host}:{port}") - raise SearchException( - message='failed to connect to elasticsearch', - details=f"connecting to host {host} and port {port}") - - def init_indices(self): - settings = {} - - # kg_index - settings['kg_index'] = { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "properties": { - "name": { - "type": "text" - }, - "type": { - "type": "text" - } - } - } - } - - # concepts_index - settings['concepts_index'] = { - "settings": { - "index.mapping.coerce": "false", - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "dynamic": "strict", - "properties": { - "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "name": {"type": "text"}, - "description": {"type": "text"}, - "type": {"type": "keyword"}, - "search_terms": {"type": "text"}, - "identifiers": { - "properties": { - "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "label": {"type": "text"}, - "equivalent_identifiers": {"type": "keyword"}, - "type": {"type": "keyword"}, - "synonyms": {"type": "text"} - } - }, - "optional_terms": {"type": "text"}, - "concept_action": {"type": "text"} - } - } - } - - # variables_index - settings['variables_index'] = { - "settings": { - "index.mapping.coerce": "false", - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "dynamic": "strict", - "properties": { - "element_id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "element_name": {"type": "text"}, - "element_desc": {"type": "text"}, - "element_action": {"type": "text"}, - "search_terms": {"type": "text"}, - "identifiers": {"type": "keyword"}, - "collection_id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, - "collection_name": {"type": "text"}, - "collection_desc": {"type": "text"}, - "collection_action": {"type": "text"}, - "data_type": {"type": "text", "fields": {"keyword": {"type": "keyword"}}} - # typed as keyword for bucket aggs - } - } - } - - logger.info(f"creating indices: {self.indices}") - for index in self.indices: - try: - if self.es.indices.exists(index=index): - logger.info(f"Ignoring index {index} which already exists.") - else: - result = self.es.indices.create( - index=index, - body=settings[index], - ignore=400) - logger.info(f"result created index {index}: {result}") - except Exception as e: - logger.error(f"exception: {e}") - raise e - - def index_doc(self, index, doc, doc_id): - self.es.index( - index=index, - id=doc_id, - body=doc) - - def update_doc(self, index, doc, doc_id): - self.es.update( - index=index, - id=doc_id, - body=doc - ) - - def search_concepts(self, index, query, offset=0, size=None, fuzziness=1, prefix_length=3): - """ - Changed to query_string for and/or and exact matches with quotations. - """ - query = { - 'query_string': { - 'query': query, - 'fuzziness': fuzziness, - 'fuzzy_prefix_length': prefix_length, - 'fields': ["name", "description", "search_terms", "optional_terms"], - 'quote_field_suffix': ".exact" - }, - } - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - search_results.update({'total_items': total_items['count']}) - return search_results - - def search_variables(self, index, concept, query, size=None, data_type=None, offset=0, fuzziness=1, - prefix_length=3): - """ - In variable seach, the concept MUST match one of the indentifiers in the list - The query can match search_terms (hence, "should") for ranking. - - Results Return - The search result is returned in JSON format {collection_id:[elements]} - - Filter - If a data_type is passed in, the result will be filtered to only contain - the passed-in data type. - - - """ - query = { - 'bool': { - 'must': { - "match": { - "identifiers": concept - } - }, - 'should': { - 'query_string': { - "query": query, - "fuzziness": fuzziness, - "fuzzy_prefix_length": prefix_length, - "default_field": "search_terms" - } - } - } - } - body = json.dumps({'query': query}) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - - # Reformat Results - new_results = {} - for elem in search_results['hits']['hits']: - elem_s = elem['_source'] - elem_type = elem_s['data_type'] - if elem_type not in new_results: - new_results[elem_type] = {} - - elem_id = elem_s['element_id'] - coll_id = elem_s['collection_id'] - elem_info = { - "description": elem_s['element_desc'], - "e_link": elem_s['element_action'], - "id": elem_id, - "name": elem_s['element_name'] - } - - # Case: collection not in dictionary for given data_type - if coll_id not in new_results[elem_type]: - # initialize document - doc = {} - - # add information - doc['c_id'] = coll_id - doc['c_link'] = elem_s['collection_action'] - doc['c_name'] = elem_s['collection_name'] - doc['elements'] = [elem_info] - - # save document - new_results[elem_type][coll_id] = doc - - # Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate elements - else: - new_results[elem_type][coll_id]['elements'].append(elem_info) - - # Flatten dicts to list - for i in new_results: - new_results[i] = list(new_results[i].values()) - - # Return results - if bool(data_type): - if data_type in new_results: - new_results = new_results[data_type] - else: - new_results = {} - return new_results - - def agg_data_type(self, index, size=0): - """ - In variable seach, the concept MUST match one of the indentifiers in the list - The query can match search_terms (hence, "should") for ranking. - """ - aggs = { - "data_type": { - "terms": { - "field": "data_type.keyword", - "size": 100 - } - } - } - body = json.dumps({'aggs': aggs}) - - search_results = self.es.search( - index=index, - body=body, - size=size - ) - data_type_list = [data_type['key'] for data_type in search_results['aggregations']['data_type']['buckets']] - search_results.update({'data type list': data_type_list}) - return data_type_list - - def search_kg(self, index, unique_id, query, offset=0, size=None, fuzziness=1, prefix_length=3): - """ - In knowledge graph search seach, the concept MUST match the unique ID - The query MUST match search_targets. The updated query allows for - fuzzy matching and for the default OR behavior for the query. - """ - query = { - "bool": { - "must": [ - {"term": { - "concept_id.keyword": unique_id - } - }, - {'query_string': { - "query": query, - "fuzziness": fuzziness, - "fuzzy_prefix_length": prefix_length, - "default_field": "search_targets" - } - } - ] - } - } - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - search_results.update({'total_items': total_items['count']}) - return search_results - - def search_nboost(self, index, query, offset=0, size=10, fuzziness=1): - """ - Query type is now 'query_string'. - query searches multiple fields - if search terms are surrounded in quotes, looks for exact matches in any of the fields - AND/OR operators are natively supported by elasticesarch queries - """ - nboost_query = { - 'nboost': { - 'uhost': f"{self.username}:{self.password}@{self.host}", - 'uport': self.hosts[0]['port'], - 'cvalues_path': '_source.description', - 'query_path': 'body.query.query_string.query', - 'size': size, - 'from': offset, - 'default_topk': size - }, - 'query': { - 'query_string': { - 'query': query, - 'fuzziness': fuzziness, - 'fields': ['name', 'description', 'instructions', 'search_targets', 'optional_targets'], - 'quote_field_suffix': ".exact" - } - } - } - - return requests.post(url=f"http://{self.nboost_host}:8000/{index}/_search", json=nboost_query).json() - - def index_concept(self, concept, index): - # Don't re-index if already in index - if self.es.exists(index, concept.id): - return - """ Index the document. """ - self.index_doc( - index=index, - doc=concept.get_searchable_dict(), - doc_id=concept.id) - - def index_element(self, elem, index): - if not self.es.exists(index, elem.id): - # If the element doesn't exist, add it directly - self.index_doc( - index=index, - doc=elem.get_searchable_dict(), - doc_id=elem.id) - else: - # Otherwise update to add any new identifiers that weren't there last time around - results = self.es.get(index, elem.id) - identifiers = results['_source']['identifiers'] + list(elem.concepts.keys()) - doc = {"doc": {}} - doc['doc']['identifiers'] = list(set(identifiers)) - self.update_doc(index=index, doc=doc, doc_id=elem.id) - - def index_kg_answer(self, concept_id, kg_answer, index, id_suffix=None): - - # Get search targets by extracting names/synonyms from non-curie nodes in answer knoweldge graph - search_targets = kg_answer.get_node_names(include_curie=False) - search_targets += kg_answer.get_node_synonyms(include_curie=False) - - # Create the Doc - doc = { - 'concept_id': concept_id, - 'search_targets': list(set(search_targets)), - 'knowledge_graph': kg_answer.get_kg() - } - - # Create unique ID - logger.debug("Indexing TranQL query answer...") - id_suffix = list(kg_answer.nodes.keys()) if id_suffix is None else id_suffix - unique_doc_id = f"{concept_id}_{id_suffix}" - - """ Index the document. """ - self.index_doc( - index=index, - doc=doc, - doc_id=unique_doc_id) - - -class Crawler: - def __init__(self, crawl_file, parser, annotator, - tranqlizer, tranql_queries, - http_session, exclude_identifiers=[], element_type=None): - - self.crawl_file = crawl_file - self.parser = parser - self.element_type = element_type - self.annotator = annotator - self.tranqlizer = tranqlizer - self.tranql_queries = tranql_queries - self.http_session = http_session - self.exclude_identifiers = exclude_identifiers - self.elements = [] - self.concepts = {} - self.crawlspace = "crawl" - - def make_crawlspace(self): - if not os.path.exists(self.crawlspace): - try: - os.makedirs(self.crawlspace) - except Exception as e: - print(f"-----------> {e}") - traceback.print_exc() - - def crawl(self): - - # Create directory for storing temporary results - self.make_crawlspace() - - # Read in elements from parser - self.elements = self.parser.parse(self.crawl_file) - - # Optionally coerce all elements to be a specific type - for element in self.elements: - if isinstance(element, parsers.DugElement) and self.element_type is not None: - element.type = self.element_type - - # Annotate elements - self.annotate_elements() - - # Expand concepts - concept_file = open(f"{self.crawlspace}/concept_file.json", "w") - for concept_id, concept in self.concepts.items(): - # Use TranQL queries to fetch knowledge graphs containing related but not synonymous biological terms - self.expand_concept(concept) - - # Traverse identifiers to create single list of of search targets/synonyms for concept - concept.set_search_terms() - - # Traverse kg answers to create list of optional search targets containing related concepts - concept.set_optional_terms() - - # Remove duplicate search terms and optional search terms - concept.clean() - - # Write concept out to a file - concept_file.write(f"{json.dumps(concept.get_searchable_dict(), indent=2)}") - - # Close concept file - concept_file.close() - - def annotate_elements(self): - - # Open variable file for writing - variable_file = open(f"{self.crawlspace}/element_file.json", "w") - - # Annotate elements/concepts and create new concepts based on the ontology identifiers returned - for element in self.elements: - # If element is actually a pre-loaded concept (e.g. TOPMed Tag), add that to list of concepts - if isinstance(element, parsers.DugConcept): - self.concepts[element.id] = element - - # Annotate element with normalized ontology identifiers - self.annotate_element(element) - if isinstance(element, parsers.DugElement): - variable_file.write(f"{element}\n") - - # Now that we have our concepts and elements fully annotated, we need to - # Make sure elements inherit the identifiers from their user-defined parent concepts - # E.g. TOPMedTag1 was annotated with HP:123 and MONDO:12. - # Each element assigned to TOPMedTag1 needs to be associated with those concepts as well - for element in self.elements: - # Skip user-defined concepts - if isinstance(element, parsers.DugConcept): - continue - - # Associate identifiers from user-defined concepts (see example above) - # with child elements of those concepts - concepts_to_add = [] - for concept_id, concept in element.concepts.items(): - for ident_id, identifier in concept.identifiers.items(): - if ident_id not in element.concepts and ident_id in self.concepts: - concepts_to_add.append(self.concepts[ident_id]) - - for concept_to_add in concepts_to_add: - element.add_concept(concept_to_add) - - # Write elements out to file - variable_file.close() - - def annotate_element(self, element): - # Annotate with a set of normalized ontology identifiers - identifiers = self.annotator.annotate(text=element.ml_ready_desc, - http_session=self.http_session) - - # Each identifier then becomes a concept that links elements together - for identifier in identifiers: - if identifier.id not in self.concepts: - # Create concept for newly seen identifier - concept = parsers.DugConcept(concept_id=identifier.id, - name=identifier.label, - desc=identifier.description, - concept_type=identifier.type) - # Add to list of concepts - self.concepts[identifier.id] = concept - - # Add identifier to list of identifiers associated with concept - self.concepts[identifier.id].add_identifier(identifier) - - # Create association between newly created concept and element - # (unless element is actually a user-defined concept) - if isinstance(element, parsers.DugElement): - element.add_concept(self.concepts[identifier.id]) - - # If element is actually a user defined concept (e.g. TOPMedTag), associate ident with concept - # Child elements of these user-defined concepts will inherit all these identifiers as well. - elif isinstance(element, parsers.DugConcept): - element.add_identifier(identifier) - - def expand_concept(self, concept): - - # Get knowledge graphs of terms related to each identifier - for ident_id, identifier in concept.identifiers.items(): - - # Conditionally skip some identifiers if they are listed in config - if ident_id in self.exclude_identifiers: - continue - - # Use pre-defined queries to search for related knowledge graphs that include the identifier - for query_name, query_factory in self.tranql_queries.items(): - - # Skip query if the identifier is not a valid query for the query class - if not query_factory.is_valid_curie(ident_id): - logger.info(f"identifier {ident_id} is not valid for query type {query_name}. Skipping!") - continue - - # Fetch kg and answer - kg_outfile = f"{self.crawlspace}/{ident_id}_{query_name}.json" - answers = self.tranqlizer.expand_identifier(ident_id, query_factory, kg_outfile) - - # Add any answer knowledge graphs to - for answer in answers: - concept.add_kg_answer(answer, query_name=query_name) - - -def get_parser(parser_type): - # User parser factor to get a specific type of parser - try: - return parsers.factory.create(parser_type) - except ValueError: - # If the parser type doesn't exist throw a more helpful exception than just value error - err_msg = f"Cannot find parser of type '{parser_type}'\n" \ - f"Supported parsers: {', '.join(parsers.factory.get_builder_types())}" - logger.error(err_msg) - raise ParserNotFoundException(err_msg) - - -class Dug: - concepts_index = "concepts_index" - variables_index = "variables_index" - kg_index = "kg_index" - - def __init__(self): - self._search = Search( - host=cfg.elasticsearch_host, - port=cfg.elasticsearch_port, - indices=[self.concepts_index, self.variables_index, self.kg_index] - ) - - def crawl(self, target_name: str, parser_type: str, element_type: str = None): - - target = Path(target_name).resolve() - parser = get_parser(parser_type) - self._crawl(target, parser, element_type) - - def _crawl(self, target: Path, parser, element_type): - - if target.is_file(): - self._crawl_file(target, parser, element_type) - else: - for child in target.iterdir(): - try: - self._crawl(child, parser, element_type) - except Exception as e: - logger.error(f"Unexpected {e.__class__.__name__} crawling {child}:{e}") - continue - - def _crawl_file(self, target: Path, parser, element_type): - - # Configure redis so we can fetch things from cache when needed - redis_connection = redis.StrictRedis(host=cfg.redis_host, - port=cfg.redis_port, - password=cfg.redis_password) - - http_session = CachedSession(cache_name='annotator', - backend='redis', - connection=redis_connection) - - # Create annotation engine for fetching ontology terms based on element text - preprocessor = anno.Preprocessor(**cfg.preprocessor) - annotator = anno.Annotator(**cfg.annotator) - normalizer = anno.Normalizer(**cfg.normalizer) - synonym_finder = anno.SynonymFinder(**cfg.synonym_service) - ontology_helper = anno.OntologyHelper(**cfg.ontology_helper) - tranqlizer = anno.ConceptExpander(**cfg.concept_expander) - - # Greenlist of ontology identifiers that can fail normalization and still be valid - ontology_greenlist = cfg.ontology_greenlist if hasattr(cfg, "ontology_greenlist") else [] - - # DugAnnotator combines all annotation components into single annotator - dug_annotator = anno.DugAnnotator(preprocessor=preprocessor, - annotator=annotator, - normalizer=normalizer, - synonym_finder=synonym_finder, - ontology_helper=ontology_helper, - ontology_greenlist=ontology_greenlist) - - # Initialize crawler - crawler = Crawler(crawl_file=str(target), - parser=parser, - annotator=dug_annotator, - tranqlizer=tranqlizer, - tranql_queries=cfg.tranql_queries, - http_session=http_session, - exclude_identifiers=cfg.tranql_exclude_identifiers, - element_type=element_type) - - # Read elements, annotate, and expand using tranql queries - crawler.crawl() - - # Index Annotated Elements - for element in crawler.elements: - # Only index DugElements as concepts will be indexed differently in next step - if not isinstance(element, parsers.DugConcept): - self._search.index_element(element, index=self.variables_index) - - # Index Annotated/TranQLized Concepts and associated knowledge graphs - for concept_id, concept in crawler.concepts.items(): - self._search.index_concept(concept, index=self.concepts_index) - - # Index knowledge graph answers for each concept - for kg_answer_id, kg_answer in concept.kg_answers.items(): - self._search.index_kg_answer(concept_id=concept_id, - kg_answer=kg_answer, - index=self.kg_index, - id_suffix=kg_answer_id) - - def search(self, target, query, **kwargs): - targets = { - 'concepts': partial( - self._search.search_concepts, index=kwargs.get('index', self.concepts_index)), - 'variables': partial( - self._search.search_variables, index=kwargs.get('index', self.variables_index), concept=kwargs.pop('concept', None)), - 'kg': partial( - self._search.search_kg, index=kwargs.get('index', self.kg_index), unique_id=kwargs.pop('unique_id', None)), - 'nboost': partial( - self._search.search_nboost, index=kwargs.get('index', None)), - } - kwargs.pop('index', None) - func = targets.get(target) - if func is None: - raise ValueError(f"Target must be one of {', '.join(targets.keys())}") - - return func(query=query, **kwargs) - - def status(self): - ... diff --git a/src/dug/core/__init__.py b/src/dug/core/__init__.py new file mode 100644 index 00000000..789ff5eb --- /dev/null +++ b/src/dug/core/__init__.py @@ -0,0 +1,106 @@ +import logging +import os +import sys +from functools import partial +from pathlib import Path +from typing import Iterable + +import pluggy +from dug.core.loaders.filesystem_loader import load_from_filesystem +from dug.core.loaders.network_loader import load_from_network + +from dug import hookspecs +from dug.config import Config +from dug.core import parsers +from dug.core.factory import DugFactory +from dug.core.parsers import DugConcept, Parser, get_parser + +logger = logging.getLogger('dug') +stdout_log_handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +stdout_log_handler.setFormatter(formatter) +logger.addHandler(stdout_log_handler) + +logging.getLogger("elasticsearch").setLevel(logging.WARNING) + + +def get_plugin_manager() -> pluggy.PluginManager: + pm = pluggy.PluginManager("dug") + pm.add_hookspecs(hookspecs) + pm.load_setuptools_entrypoints("dug") + pm.register(parsers) + return pm + + +def get_targets(target_name) -> Iterable[Path]: + if target_name.startswith("http://") or target_name.startswith("https://"): + loader = partial(load_from_network, os.getenv("DUG_DATA_DIR", "data")) + else: + loader = load_from_filesystem + return loader(target_name) + + +class Dug: + concepts_index = "concepts_index" + variables_index = "variables_index" + kg_index = "kg_index" + + def __init__(self, factory: DugFactory): + self._factory = factory + self._search = self._factory.build_search_obj(indices=[ + self.concepts_index, self.variables_index, self.kg_index + ]) + + def crawl(self, target_name: str, parser_type: str, element_type: str = None): + + pm = get_plugin_manager() + parser = get_parser(pm.hook, parser_type) + targets = get_targets(target_name) + + for target in targets: + self._crawl(target, parser, element_type) + + def _crawl(self, target: Path, parser: Parser, element_type): + + # Initialize crawler + crawler = self._factory.build_crawler(target, parser, element_type) + # Read elements, annotate, and expand using tranql queries + crawler.crawl() + + # Index Annotated Elements + for element in crawler.elements: + # Only index DugElements as concepts will be indexed differently in next step + if not isinstance(element, DugConcept): + self._search.index_element(element, index=self.variables_index) + + # Index Annotated/TranQLized Concepts and associated knowledge graphs + for concept_id, concept in crawler.concepts.items(): + self._search.index_concept(concept, index=self.concepts_index) + + # Index knowledge graph answers for each concept + for kg_answer_id, kg_answer in concept.kg_answers.items(): + self._search.index_kg_answer(concept_id=concept_id, + kg_answer=kg_answer, + index=self.kg_index, + id_suffix=kg_answer_id) + + def search(self, target, query, **kwargs): + targets = { + 'concepts': partial( + self._search.search_concepts, index=kwargs.get('index', self.concepts_index)), + 'variables': partial( + self._search.search_variables, index=kwargs.get('index', self.variables_index), concept=kwargs.pop('concept', None)), + 'kg': partial( + self._search.search_kg, index=kwargs.get('index', self.kg_index), unique_id=kwargs.pop('unique_id', None)), + 'nboost': partial( + self._search.search_nboost, index=kwargs.get('index', None)), + } + kwargs.pop('index', None) + func = targets.get(target) + if func is None: + raise ValueError(f"Target must be one of {', '.join(targets.keys())}") + + return func(query=query, **kwargs) + + def status(self): + ... diff --git a/src/dug/annotate.py b/src/dug/core/annotate.py similarity index 98% rename from src/dug/annotate.py rename to src/dug/core/annotate.py index e9ceba83..e792d56c 100644 --- a/src/dug/annotate.py +++ b/src/dug/core/annotate.py @@ -1,14 +1,13 @@ import json import logging -import urllib.parse import os -from copy import copy -from typing import TypeVar, Generic, Union, List, Optional, Tuple +import urllib.parse +from typing import TypeVar, Generic, Union, List, Tuple import requests from requests import Session -import dug.tranql as tql +import dug.core.tranql as tql logger = logging.getLogger('dug') @@ -142,13 +141,17 @@ def expand_identifier(self, identifier, query_factory, kg_filename): response = json.load(stream) else: query = query_factory.get_query(identifier) - logger.info(query) + logger.debug(query) response = requests.post( url=self.url, headers=self.tranql_headers, data=query).json() # Case: Skip if empty KG + logger.debug(response) + if 'knowledge_graph' not in response: + logger.debug(f"Did not find a knowledge graph for {query}") + return [] if not len(response['knowledge_graph']['nodes']): logger.debug(f"Did not find a knowledge graph for {query}") return [] diff --git a/src/dug/core/crawler.py b/src/dug/core/crawler.py new file mode 100644 index 00000000..02f59a3a --- /dev/null +++ b/src/dug/core/crawler.py @@ -0,0 +1,167 @@ +import json +import logging +import os +import traceback + +from dug.core.parsers import Parser, DugElement, DugConcept + +logger = logging.getLogger('dug') + + +class Crawler: + def __init__(self, crawl_file: str, parser: Parser, annotator, + tranqlizer, tranql_queries, + http_session, exclude_identifiers=None, element_type=None): + + if exclude_identifiers is None: + exclude_identifiers = [] + + self.crawl_file = crawl_file + self.parser: Parser = parser + self.element_type = element_type + self.annotator = annotator + self.tranqlizer = tranqlizer + self.tranql_queries = tranql_queries + self.http_session = http_session + self.exclude_identifiers = exclude_identifiers + self.elements = [] + self.concepts = {} + self.crawlspace = "crawl" + + def make_crawlspace(self): + if not os.path.exists(self.crawlspace): + try: + os.makedirs(self.crawlspace) + except Exception as e: + print(f"-----------> {e}") + traceback.print_exc() + + def crawl(self): + + # Create directory for storing temporary results + self.make_crawlspace() + + # Read in elements from parser + self.elements = self.parser(self.crawl_file) + + # Optionally coerce all elements to be a specific type + for element in self.elements: + if isinstance(element, DugElement) and self.element_type is not None: + element.type = self.element_type + + # Annotate elements + self.annotate_elements() + + # Expand concepts + concept_file = open(f"{self.crawlspace}/concept_file.json", "w") + for concept_id, concept in self.concepts.items(): + # Use TranQL queries to fetch knowledge graphs containing related but not synonymous biological terms + self.expand_concept(concept) + + # Traverse identifiers to create single list of of search targets/synonyms for concept + concept.set_search_terms() + + # Traverse kg answers to create list of optional search targets containing related concepts + concept.set_optional_terms() + + # Remove duplicate search terms and optional search terms + concept.clean() + + # Write concept out to a file + concept_file.write(f"{json.dumps(concept.get_searchable_dict(), indent=2)}") + + # Close concept file + concept_file.close() + + def annotate_elements(self): + + # Open variable file for writing + variable_file = open(f"{self.crawlspace}/element_file.json", "w") + + # Annotate elements/concepts and create new concepts based on the ontology identifiers returned + for element in self.elements: + # If element is actually a pre-loaded concept (e.g. TOPMed Tag), add that to list of concepts + if isinstance(element, DugConcept): + self.concepts[element.id] = element + + # Annotate element with normalized ontology identifiers + self.annotate_element(element) + if isinstance(element, DugElement): + variable_file.write(f"{element}\n") + + # Now that we have our concepts and elements fully annotated, we need to + # Make sure elements inherit the identifiers from their user-defined parent concepts + # E.g. TOPMedTag1 was annotated with HP:123 and MONDO:12. + # Each element assigned to TOPMedTag1 needs to be associated with those concepts as well + for element in self.elements: + # Skip user-defined concepts + if isinstance(element, DugConcept): + continue + + # Associate identifiers from user-defined concepts (see example above) + # with child elements of those concepts + concepts_to_add = [] + for concept_id, concept in element.concepts.items(): + for ident_id, identifier in concept.identifiers.items(): + if ident_id not in element.concepts and ident_id in self.concepts: + concepts_to_add.append(self.concepts[ident_id]) + + for concept_to_add in concepts_to_add: + element.add_concept(concept_to_add) + + # Write elements out to file + variable_file.close() + + def annotate_element(self, element): + # Annotate with a set of normalized ontology identifiers + identifiers = self.annotator.annotate(text=element.ml_ready_desc, + http_session=self.http_session) + + # Each identifier then becomes a concept that links elements together + for identifier in identifiers: + if identifier.id not in self.concepts: + # Create concept for newly seen identifier + concept = DugConcept(concept_id=identifier.id, + name=identifier.label, + desc=identifier.description, + concept_type=identifier.type) + # Add to list of concepts + self.concepts[identifier.id] = concept + + # Add identifier to list of identifiers associated with concept + self.concepts[identifier.id].add_identifier(identifier) + + # Create association between newly created concept and element + # (unless element is actually a user-defined concept) + if isinstance(element, DugElement): + element.add_concept(self.concepts[identifier.id]) + + # If element is actually a user defined concept (e.g. TOPMedTag), associate ident with concept + # Child elements of these user-defined concepts will inherit all these identifiers as well. + elif isinstance(element, DugConcept): + element.add_identifier(identifier) + + def expand_concept(self, concept): + + # Get knowledge graphs of terms related to each identifier + for ident_id, identifier in concept.identifiers.items(): + + # Conditionally skip some identifiers if they are listed in config + if ident_id in self.exclude_identifiers: + continue + + # Use pre-defined queries to search for related knowledge graphs that include the identifier + for query_name, query_factory in self.tranql_queries.items(): + + # Skip query if the identifier is not a valid query for the query class + if not query_factory.is_valid_curie(ident_id): + logger.info(f"identifier {ident_id} is not valid for query type {query_name}. Skipping!") + continue + + # Fetch kg and answer + kg_outfile = f"{self.crawlspace}/{ident_id}_{query_name}.json" + answers = self.tranqlizer.expand_identifier(ident_id, query_factory, kg_outfile) + + # Add any answer knowledge graphs to + for answer in answers: + concept.add_kg_answer(answer, query_name=query_name) \ No newline at end of file diff --git a/src/dug/core/factory.py b/src/dug/core/factory.py new file mode 100644 index 00000000..67342f95 --- /dev/null +++ b/src/dug/core/factory.py @@ -0,0 +1,79 @@ +from typing import Dict + +import redis +from requests_cache import CachedSession + +import dug.core.tranql as tql +from dug.core.annotate import DugAnnotator, Annotator, Normalizer, OntologyHelper, Preprocessor, SynonymFinder, \ + ConceptExpander +from dug.config import Config as DugConfig, TRANQL_SOURCE +from dug.core.crawler import Crawler +from dug.core.parsers import Parser +from dug.core.search import Search + + +class DugFactory: + + def __init__(self, config: DugConfig): + self.config = config + + def build_http_session(self) -> CachedSession: + + redis_config = { + 'host': self.config.redis_host, + 'port': self.config.redis_port, + 'password': self.config.redis_password, + } + + return CachedSession( + cache_name='annotator', + backend='redis', + connection=redis.StrictRedis(**redis_config) + ) + + def build_crawler(self, target, parser: Parser, element_type: str, tranql_source=None) -> Crawler: + crawler = Crawler( + crawl_file=str(target), + parser=parser, + annotator=self.build_annotator(), + tranqlizer=self.build_tranqlizer(), + tranql_queries=self.build_tranql_queries(tranql_source), + http_session=self.build_http_session(), + exclude_identifiers=self.config.tranql_exclude_identifiers, + element_type=element_type + ) + + return crawler + + def build_annotator(self) -> DugAnnotator: + + preprocessor = Preprocessor(**self.config.preprocessor) + annotator = Annotator(**self.config.annotator) + normalizer = Normalizer(**self.config.normalizer) + synonym_finder = SynonymFinder(**self.config.synonym_service) + ontology_helper = OntologyHelper(**self.config.ontology_helper) + + annotator = DugAnnotator( + preprocessor=preprocessor, + annotator=annotator, + normalizer=normalizer, + synonym_finder=synonym_finder, + ontology_helper=ontology_helper + ) + + return annotator + + def build_tranqlizer(self) -> ConceptExpander: + return ConceptExpander(**self.config.concept_expander) + + def build_tranql_queries(self, source=None) -> Dict[str, tql.QueryFactory]: + if source is None: + source = TRANQL_SOURCE + return { + key: tql.QueryFactory(self.config.tranql_queries[key], source) + for key + in self.config.tranql_queries + } + + def build_search_obj(self, indices) -> Search: + return Search(self.config, indices=indices) diff --git a/src/dug/core/loaders/__init__.py b/src/dug/core/loaders/__init__.py new file mode 100644 index 00000000..c1648cd7 --- /dev/null +++ b/src/dug/core/loaders/__init__.py @@ -0,0 +1 @@ +from ._base import InputFile, Loader diff --git a/src/dug/core/loaders/_base.py b/src/dug/core/loaders/_base.py new file mode 100644 index 00000000..1ee15b16 --- /dev/null +++ b/src/dug/core/loaders/_base.py @@ -0,0 +1,6 @@ +from pathlib import Path +from typing import Union, Iterable, Callable, Iterator + +InputFile = Union[str, Path] + +Loader = Callable[[str], Iterator[Path]] diff --git a/src/dug/core/loaders/filesystem_loader.py b/src/dug/core/loaders/filesystem_loader.py new file mode 100644 index 00000000..7a606474 --- /dev/null +++ b/src/dug/core/loaders/filesystem_loader.py @@ -0,0 +1,17 @@ +from pathlib import Path +from typing import Iterator + +from ._base import InputFile + + +def load_from_filesystem(filepath: InputFile) -> Iterator[Path]: + + filepath = Path(filepath) + + if not filepath.exists(): + raise ValueError(f"Unable to locate {filepath}") + + if filepath.is_file(): + yield filepath + else: + yield from filepath.glob("**/*") diff --git a/src/dug/core/loaders/network_loader.py b/src/dug/core/loaders/network_loader.py new file mode 100644 index 00000000..0bf5ccc2 --- /dev/null +++ b/src/dug/core/loaders/network_loader.py @@ -0,0 +1,32 @@ +import logging +from pathlib import Path +from typing import Iterator +from urllib.parse import urlparse + +import requests + +from ._base import InputFile + +logger = logging.getLogger('dug') + + +def load_from_network(data_storage_dir: InputFile, urls: str) -> Iterator[Path]: + data_storage_dir = Path(data_storage_dir).resolve() + url_list = urls.split(",") + for url in url_list: + logger.info(f"Fetching {url}") + + parse_result = urlparse(url) + response = requests.get(url) + + if not response.ok: + raise ValueError(f"Could not fetch {url}: {response.status_code}, {response.text}") + + nonroot_path = parse_result.path.lstrip('/') + + output_location = data_storage_dir / parse_result.netloc / nonroot_path + output_location.parent.mkdir(parents=True, exist_ok=True) + + output_location.write_text(response.text) + + yield output_location \ No newline at end of file diff --git a/src/dug/core/parsers/__init__.py b/src/dug/core/parsers/__init__.py new file mode 100644 index 00000000..00f1aaab --- /dev/null +++ b/src/dug/core/parsers/__init__.py @@ -0,0 +1,37 @@ +import logging +from typing import Dict + +import pluggy + +from ._base import DugElement, DugConcept, Indexable, Parser, FileParser +from .dbgap_parser import DbGaPParser +from .topmed_tag_parser import TOPMedTagParser + +logger = logging.getLogger('dug') + +hookimpl = pluggy.HookimplMarker("dug") + + +@hookimpl +def define_parsers(parser_dict: Dict[str, Parser]): + parser_dict["dbgap"] = DbGaPParser() + parser_dict["topmedtag"] = TOPMedTagParser() + + +class ParserNotFoundException(Exception): + ... + + +def get_parser(hook, parser_name) -> Parser: + """Get the parser from all parsers registered via the define_parsers hook""" + + available_parsers = {} + hook.define_parsers(parser_dict=available_parsers) + parser = available_parsers.get(parser_name.lower()) + if parser is not None: + return parser + + err_msg = f"Cannot find parser of type '{parser_name}'\n" \ + f"Supported parsers: {', '.join(available_parsers.keys())}" + logger.error(err_msg) + raise ParserNotFoundException(err_msg) diff --git a/src/dug/core/parsers/_base.py b/src/dug/core/parsers/_base.py new file mode 100644 index 00000000..a3943cbc --- /dev/null +++ b/src/dug/core/parsers/_base.py @@ -0,0 +1,124 @@ +import json +from typing import Union, Callable, Any, Iterable + +from dug.core.loaders import InputFile + +from dug import utils as utils + + +class DugElement: + # Basic class for holding information for an object you want to make searchable via Dug + # Could be a DbGaP variable, DICOM image, App, or really anything + # Optionally can hold information pertaining to a containing collection (e.g. dbgap study or dicom image series) + def __init__(self, elem_id, name, desc, elem_type, collection_id="", collection_name="", collection_desc=""): + self.id = elem_id + self.name = name + self.description = desc + self.type = elem_type + self.collection_id = collection_id + self.collection_name = collection_name + self.collection_desc = collection_desc + self.action = "" + self.collection_action = "" + self.concepts = {} + self.ml_ready_desc = desc + + def add_concept(self, concept): + self.concepts[concept.id] = concept + + def jsonable(self): + return self.__dict__ + + def get_searchable_dict(self): + # Translate DugElement to ES-style dict + es_elem = { + 'element_id': self.id, + 'element_name': self.name, + 'element_desc': self.description, + 'collection_id': self.collection_id, + 'collection_name': self.collection_name, + 'collection_desc': self.collection_desc, + 'element_action': self.action, + 'collection_action': self.collection_action, + 'data_type': self.type, + 'identifiers': list(self.concepts.keys()) + } + return es_elem + + def __str__(self): + return json.dumps(self.__dict__, indent=2, default=utils.complex_handler) + + +class DugConcept: + # Basic class for holding information about concepts that are used to organize elements + # All Concepts map to at least one element + def __init__(self, concept_id, name, desc, concept_type): + self.id = concept_id + self.name = name + self.description = desc + self.type = concept_type + self.concept_action = "" + self.identifiers = {} + self.kg_answers = {} + self.search_terms = [] + self.optional_terms = [] + self.ml_ready_desc = desc + + def add_identifier(self, ident): + if ident.id in self.identifiers: + for search_text in ident.search_text: + self.identifiers[ident.id].add_search_text(search_text) + else: + self.identifiers[ident.id] = ident + + def add_kg_answer(self, answer, query_name): + answer_node_ids = list(answer.nodes.keys()) + answer_id = f'{"_".join(answer_node_ids)}_{query_name}' + if answer_id not in self.kg_answers: + self.kg_answers[answer_id] = answer + + def clean(self): + self.search_terms = list(set(self.search_terms)) + self.optional_terms = list(set(self.optional_terms)) + + def set_search_terms(self): + # Traverse set of identifiers to determine set of search terms + search_terms = self.search_terms + for ident_id, ident in self.identifiers.items(): + search_terms.extend([ident.label, ident.description] + ident.search_text + ident.synonyms) + self.search_terms = list(set(search_terms)) + + def set_optional_terms(self): + # Traverse set of knowledge graph answers to determine set of optional search terms + optional_terms = self.optional_terms + for kg_id, kg_answer in self.kg_answers.items(): + optional_terms += kg_answer.get_node_names() + optional_terms += kg_answer.get_node_synonyms() + self.optional_terms = list(set(optional_terms)) + + def get_searchable_dict(self): + # Translate DugConcept into Elastic-Compatible Concept + es_conc = { + 'id': self.id, + 'name': self.name, + 'description': self.description, + 'type': self.type, + 'search_terms': self.search_terms, + 'optional_terms': self.optional_terms, + 'concept_action': self.concept_action, + 'identifiers': [ident.get_searchable_dict() for ident_id, ident in self.identifiers.items()] + } + return es_conc + + def jsonable(self): + return self.__dict__ + + def __str__(self): + return json.dumps(self.__dict__, indent=2, default=utils.complex_handler) + + +Indexable = Union[DugElement, DugConcept] +Parser = Callable[[Any], Iterable[Indexable]] + + +FileParser = Callable[[InputFile], Iterable[Indexable]] diff --git a/src/dug/core/parsers/dbgap_parser.py b/src/dug/core/parsers/dbgap_parser.py new file mode 100644 index 00000000..4cdc13ed --- /dev/null +++ b/src/dug/core/parsers/dbgap_parser.py @@ -0,0 +1,57 @@ +import logging +import re +from typing import List +from xml.etree import ElementTree as ET + +from dug import utils as utils +from ._base import DugElement, FileParser, Indexable, InputFile + +logger = logging.getLogger('dug') + + +class DbGaPParser(FileParser): + # Class for parsers DBGaP Data dictionary into a set of Dug Elements + + @staticmethod + def parse_study_name_from_filename(filename: str): + # Parse the study name from the xml filename if it exists. Return None if filename isn't right format to get id from + dbgap_file_pattern = re.compile(r'.*/*phs[0-9]+\.v[0-9]\.pht[0-9]+\.v[0-9]\.(.+)\.data_dict.*') + match = re.match(dbgap_file_pattern, filename) + if match is not None: + return match.group(1) + return None + + def __call__(self, input_file: InputFile) -> List[Indexable]: + logger.debug(input_file) + tree = ET.parse(input_file) + root = tree.getroot() + study_id = root.attrib['study_id'] + participant_set = root.attrib['participant_set'] + + # Parse study name from file handle + study_name = self.parse_study_name_from_filename(str(input_file)) + + if study_name is None: + err_msg = f"Unable to parse DbGaP study name from data dictionary: {input_file}!" + logger.error(err_msg) + raise IOError(err_msg) + + elements = [] + for variable in root.iter('variable'): + elem = DugElement(elem_id=f"{variable.attrib['id']}.p{participant_set}", + name=variable.find('name').text, + desc=variable.find('description').text.lower(), + elem_type="DbGaP", + collection_id=f"{study_id}.p{participant_set}", + collection_name=study_name) + + # Create DBGaP links as study/variable actions + elem.collection_action = utils.get_dbgap_study_link(study_id=elem.collection_id) + elem.action = utils.get_dbgap_var_link(study_id=elem.collection_id, + variable_id=elem.id.split(".")[0].split("phv")[1]) + # Add to set of variables + logger.debug(elem) + elements.append(elem) + + # You don't actually create any concepts + return elements diff --git a/src/dug/core/parsers/topmed_tag_parser.py b/src/dug/core/parsers/topmed_tag_parser.py new file mode 100644 index 00000000..448a84b5 --- /dev/null +++ b/src/dug/core/parsers/topmed_tag_parser.py @@ -0,0 +1,83 @@ +import csv +import json +import logging +import os +from typing import List + +from dug import utils as utils +from ._base import DugConcept, DugElement, FileParser, Indexable, InputFile + +logger = logging.getLogger('dug') + + +class TOPMedTagParser(FileParser): + + def __call__(self, input_file: InputFile) -> List[Indexable]: + """ + Load tagged variables. + Presumes a harmonized variable list as a CSV file as input. + A naming convention such that _variables_.csv will be the form of the filename. + An adjacent file called _tags_study + * phenotype->disease->study + * disease->study + * disease->phenotype->study + """ + + def __init__(self, cfg: Config, indices=None): + + if indices is None: + indices = ['concepts_index', 'variables_index', 'kg_index'] + + self._cfg = cfg + logger.debug(f"Connecting to elasticsearch host: {self._cfg.elastic_host} at port: {self._cfg.elastic_port}") + + self.indices = indices + self.hosts = [{'host': self._cfg.elastic_host, 'port': self._cfg.elastic_port}] + + logger.debug(f"Authenticating as user {self._cfg.elastic_username} to host:{self.hosts}") + + self.es = Elasticsearch(hosts=self.hosts, + http_auth=(self._cfg.elastic_username, self._cfg.elastic_password)) + + if self.es.ping(): + logger.info('connected to elasticsearch') + self.init_indices() + else: + print(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") + logger.error(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") + raise SearchException( + message='failed to connect to elasticsearch', + details=f"connecting to host {self._cfg.elastic_host} and port {self._cfg.elastic_port}") + + def init_indices(self): + kg_index = { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "type": { + "type": "text" + } + } + } + } + concepts_index = { + "settings": { + "index.mapping.coerce": "false", + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "dynamic": "strict", + "properties": { + "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "name": {"type": "text"}, + "description": {"type": "text"}, + "type": {"type": "keyword"}, + "search_terms": {"type": "text"}, + "identifiers": { + "properties": { + "id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "label": {"type": "text"}, + "equivalent_identifiers": {"type": "keyword"}, + "type": {"type": "keyword"}, + "synonyms": {"type": "text"} + } + }, + "optional_terms": {"type": "text"}, + "concept_action": {"type": "text"} + } + } + } + variables_index = { + "settings": { + "index.mapping.coerce": "false", + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "dynamic": "strict", + "properties": { + "element_id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "element_name": {"type": "text"}, + "element_desc": {"type": "text"}, + "element_action": {"type": "text"}, + "search_terms": {"type": "text"}, + "identifiers": {"type": "keyword"}, + "collection_id": {"type": "text", "fields": {"keyword": {"type": "keyword"}}}, + "collection_name": {"type": "text"}, + "collection_desc": {"type": "text"}, + "collection_action": {"type": "text"}, + "data_type": {"type": "text", "fields": {"keyword": {"type": "keyword"}}} + # typed as keyword for bucket aggs + } + } + } + + settings = { + 'kg_index': kg_index, + 'concepts_index': concepts_index, + 'variables_index': variables_index, + } + + logger.info(f"creating indices") + logger.debug(self.indices) + for index in self.indices: + try: + if self.es.indices.exists(index=index): + logger.info(f"Ignoring index {index} which already exists.") + else: + result = self.es.indices.create( + index=index, + body=settings[index], + ignore=400) + logger.info(f"result created index {index}: {result}") + except Exception as e: + logger.error(f"exception: {e}") + raise e + + def index_doc(self, index, doc, doc_id): + self.es.index( + index=index, + id=doc_id, + body=doc) + + def update_doc(self, index, doc, doc_id): + self.es.update( + index=index, + id=doc_id, + body=doc + ) + + def search_concepts(self, index, query, offset=0, size=None, fuzziness=1, prefix_length=3): + """ + Changed to query_string for and/or and exact matches with quotations. + """ + query = { + 'query_string': { + 'query': query, + 'fuzziness': fuzziness, + 'fuzzy_prefix_length': prefix_length, + 'fields': ["name", "description", "search_terms", "optional_terms"], + 'quote_field_suffix': ".exact" + }, + } + body = json.dumps({'query': query}) + total_items = self.es.count(body=body, index=index) + search_results = self.es.search( + index=index, + body=body, + filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], + from_=offset, + size=size + ) + search_results.update({'total_items': total_items['count']}) + return search_results + + def search_variables(self, index, concept, query, size=None, data_type=None, offset=0, fuzziness=1, + prefix_length=3): + """ + In variable seach, the concept MUST match one of the indentifiers in the list + The query can match search_terms (hence, "should") for ranking. + + Results Return + The search result is returned in JSON format {collection_id:[elements]} + + Filter + If a data_type is passed in, the result will be filtered to only contain + the passed-in data type. + """ + query = { + 'bool': { + 'must': { + "match": { + "identifiers": concept + } + }, + 'should': { + 'query_string': { + "query": query, + "fuzziness": fuzziness, + "fuzzy_prefix_length": prefix_length, + "default_field": "search_terms" + } + } + } + } + body = json.dumps({'query': query}) + search_results = self.es.search( + index=index, + body=body, + filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], + from_=offset, + size=size + ) + + # Reformat Results + new_results = {} + for elem in search_results['hits']['hits']: + elem_s = elem['_source'] + elem_type = elem_s['data_type'] + if elem_type not in new_results: + new_results[elem_type] = {} + + elem_id = elem_s['element_id'] + coll_id = elem_s['collection_id'] + elem_info = { + "description": elem_s['element_desc'], + "e_link": elem_s['element_action'], + "id": elem_id, + "name": elem_s['element_name'] + } + + # Case: collection not in dictionary for given data_type + if coll_id not in new_results[elem_type]: + # initialize document + doc = {} + + # add information + doc['c_id'] = coll_id + doc['c_link'] = elem_s['collection_action'] + doc['c_name'] = elem_s['collection_name'] + doc['elements'] = [elem_info] + + # save document + new_results[elem_type][coll_id] = doc + + # Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate elements + else: + new_results[elem_type][coll_id]['elements'].append(elem_info) + + # Flatten dicts to list + for i in new_results: + new_results[i] = list(new_results[i].values()) + + # Return results + if bool(data_type): + if data_type in new_results: + new_results = new_results[data_type] + else: + new_results = {} + return new_results + + def agg_data_type(self, index, size=0): + """ + In variable seach, the concept MUST match one of the indentifiers in the list + The query can match search_terms (hence, "should") for ranking. + """ + aggs = { + "data_type": { + "terms": { + "field": "data_type.keyword", + "size": 100 + } + } + } + body = json.dumps({'aggs': aggs}) + + search_results = self.es.search( + index=index, + body=body, + size=size + ) + data_type_list = [data_type['key'] for data_type in search_results['aggregations']['data_type']['buckets']] + search_results.update({'data type list': data_type_list}) + return data_type_list + + def search_kg(self, index, unique_id, query, offset=0, size=None, fuzziness=1, prefix_length=3): + """ + In knowledge graph search seach, the concept MUST match the unique ID + The query MUST match search_targets. The updated query allows for + fuzzy matching and for the default OR behavior for the query. + """ + query = { + "bool": { + "must": [ + {"term": { + "concept_id.keyword": unique_id + } + }, + {'query_string': { + "query": query, + "fuzziness": fuzziness, + "fuzzy_prefix_length": prefix_length, + "default_field": "search_targets" + } + } + ] + } + } + body = json.dumps({'query': query}) + total_items = self.es.count(body=body, index=index) + search_results = self.es.search( + index=index, + body=body, + filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], + from_=offset, + size=size + ) + search_results.update({'total_items': total_items['count']}) + return search_results + + def search_nboost(self, index, query, offset=0, size=10, fuzziness=1): + """ + Query type is now 'query_string'. + query searches multiple fields + if search terms are surrounded in quotes, looks for exact matches in any of the fields + AND/OR operators are natively supported by elasticesarch queries + """ + nboost_query = { + 'nboost': { + 'uhost': f"{self._cfg.elastic_username}:{self._cfg.elastic_password}@{self._cfg.elastic_host}", + 'uport': self._cfg.elastic_port, + 'cvalues_path': '_source.description', + 'query_path': 'body.query.query_string.query', + 'size': size, + 'from': offset, + 'default_topk': size + }, + 'query': { + 'query_string': { + 'query': query, + 'fuzziness': fuzziness, + 'fields': ['name', 'description', 'instructions', 'search_targets', 'optional_targets'], + 'quote_field_suffix': ".exact" + } + } + } + + return requests.post(url=f"http://{self._cfg.nboost_host}:{self._cfg.nboost_port}/{index}/_search", json=nboost_query).json() + + def index_concept(self, concept, index): + # Don't re-index if already in index + if self.es.exists(index, concept.id): + return + """ Index the document. """ + self.index_doc( + index=index, + doc=concept.get_searchable_dict(), + doc_id=concept.id) + + def index_element(self, elem, index): + if not self.es.exists(index, elem.id): + # If the element doesn't exist, add it directly + self.index_doc( + index=index, + doc=elem.get_searchable_dict(), + doc_id=elem.id) + else: + # Otherwise update to add any new identifiers that weren't there last time around + results = self.es.get(index, elem.id) + identifiers = results['_source']['identifiers'] + list(elem.concepts.keys()) + doc = {"doc": {}} + doc['doc']['identifiers'] = list(set(identifiers)) + self.update_doc(index=index, doc=doc, doc_id=elem.id) + + def index_kg_answer(self, concept_id, kg_answer, index, id_suffix=None): + + # Get search targets by extracting names/synonyms from non-curie nodes in answer knoweldge graph + search_targets = kg_answer.get_node_names(include_curie=False) + search_targets += kg_answer.get_node_synonyms(include_curie=False) + + # Create the Doc + doc = { + 'concept_id': concept_id, + 'search_targets': list(set(search_targets)), + 'knowledge_graph': kg_answer.get_kg() + } + + # Create unique ID + logger.debug("Indexing TranQL query answer...") + id_suffix = list(kg_answer.nodes.keys()) if id_suffix is None else id_suffix + unique_doc_id = f"{concept_id}_{id_suffix}" + + """ Index the document. """ + self.index_doc( + index=index, + doc=doc, + doc_id=unique_doc_id) + + +class SearchException(Exception): + def __init__(self, message, details): + self.message = message + self.details = details \ No newline at end of file diff --git a/src/dug/tranql.py b/src/dug/core/tranql.py similarity index 89% rename from src/dug/tranql.py rename to src/dug/core/tranql.py index 8126f2e3..35dcebac 100644 --- a/src/dug/tranql.py +++ b/src/dug/core/tranql.py @@ -13,11 +13,16 @@ class QueryKG: def __init__(self, kg_json): self.kg = kg_json self.answers = kg_json.get('knowledge_map', []) - self.question = kg_json["question_graph"] + self.question = kg_json.get("question_graph", {}) self.nodes = {node["id"]: node for node in kg_json.get('knowledge_graph', {}).get('nodes', [])} self.edges = {edge["id"]: edge for edge in kg_json.get('knowledge_graph', {}).get('edges', [])} - def get_answer_subgraph(self, answer, include_node_keys=[], include_edge_keys=[]): + def get_answer_subgraph(self, answer, include_node_keys=None, include_edge_keys=None): + if include_node_keys is None: + include_node_keys = [] + + if include_edge_keys is None: + include_edge_keys = [] # Get answer nodes answer_nodes = [] @@ -27,7 +32,7 @@ def get_answer_subgraph(self, answer, include_node_keys=[], include_edge_keys=[] # Throw error if node doesn't actually exist in 'nodes' section of knowledge graph if answer_node not in self.nodes: err_msg = f"Unable to assemble subraph for answer:\n{json.dumps(answer, indent=2)}\n" \ - f"Parent graph doesn't contain node info for: {answer_node}" + f"Parent graph doesn't contain node info for: {answer_node}" raise MissingNodeReferenceError(err_msg) # Add only node info that you actually want @@ -41,7 +46,7 @@ def get_answer_subgraph(self, answer, include_node_keys=[], include_edge_keys=[] # Throw error if edge doesn't actually exist in 'edges' section of knowledge graph if answer_edge not in self.edges: err_msg = f"Unable to assemble subraph for answer:\n{json.dumps(answer, indent=2)}\n" \ - f"Parent graph doesn't contain edge info for: {answer_edge}" + f"Parent graph doesn't contain edge info for: {answer_edge}" raise MissingEdgeReferenceError(err_msg) # Add only information from edge that you actually want @@ -54,16 +59,21 @@ def get_answer_subgraph(self, answer, include_node_keys=[], include_edge_keys=[] return QueryKG(kg) - def get_node(self, node_id, include_node_keys=[]): + def get_node(self, node_id, include_node_keys=None): + if include_node_keys is None: + include_node_keys = [] # Return node with optionally subsetted information node = self.nodes[node_id] # Optionally subset to get only certain information columns if include_node_keys: - node = {key: node[key] for key in include_node_keys} + node = {key: node.get(key, []) for key in include_node_keys} return node - def get_edge(self, edge_id, include_edge_keys=[]): + def get_edge(self, edge_id, include_edge_keys=None): + + if include_edge_keys is None: + include_edge_keys = [] # Return edge with optionally subsetted information edge = self.edges[edge_id] @@ -102,7 +112,6 @@ class InvalidQueryError(BaseException): class QueryFactory: - # Class member list of valid data types that can be included in query data_types = ["phenotypic_feature", "gene", "disease", "chemical_substance", "drug_exposure", "biological_process", "anatomical_entity"] @@ -139,7 +148,7 @@ def __init__(self, question_graph, source, curie_index=0): def validate_factory(self): # Check to make sure all the question types are valid for question in self.question_graph: - if not question in QueryFactory.data_types: + if question not in QueryFactory.data_types: raise InvalidQueryError(f"Query contains invalid query type: {question}") def is_valid_curie(self, curie): diff --git a/src/dug/hookspecs.py b/src/dug/hookspecs.py new file mode 100644 index 00000000..3a02b9a9 --- /dev/null +++ b/src/dug/hookspecs.py @@ -0,0 +1,14 @@ +from typing import Dict + +import pluggy + +from dug.core.parsers import Parser + +hookspec = pluggy.HookspecMarker("dug") + + +@hookspec +def define_parsers(parser_dict: Dict[str, Parser]): + """Defines what parsers are available to Dug + """ + ... diff --git a/src/dug/parsers.py b/src/dug/parsers.py deleted file mode 100644 index 291d0239..00000000 --- a/src/dug/parsers.py +++ /dev/null @@ -1,240 +0,0 @@ -import csv -import json -import logging -import os -import xml.etree.ElementTree as ET -import dug.utils as utils - - -logger = logging.getLogger('dug') - - -class DugElement: - # Basic class for holding information for an object you want to make searchable via Dug - # Could be a DbGaP variable, DICOM image, App, or really anything - # Optionally can hold information pertaining to a containing collection (e.g. dbgap study or dicom image series) - def __init__(self, elem_id, name, desc, elem_type, collection_id="", collection_name="", collection_desc=""): - self.id = elem_id - self.name = name - self.description = desc - self.type = elem_type - self.collection_id = collection_id - self.collection_name = collection_name - self.collection_desc = collection_desc - self.action = "" - self.collection_action = "" - self.concepts = {} - self.ml_ready_desc = desc - - def add_concept(self, concept): - self.concepts[concept.id] = concept - - def jsonable(self): - return self.__dict__ - - def get_searchable_dict(self): - # Translate DugElement to ES-style dict - es_elem = { - 'element_id': self.id, - 'element_name': self.name, - 'element_desc': self.description, - 'collection_id': self.collection_id, - 'collection_name': self.collection_name, - 'collection_desc': self.collection_desc, - 'element_action': self.action, - 'collection_action': self.collection_action, - 'data_type': self.type, - 'identifiers': list(self.concepts.keys()) - } - return es_elem - - def __str__(self): - return json.dumps(self.__dict__, indent=2, default=utils.complex_handler) - - -class DugConcept: - # Basic class for holding information about concepts that are used to organize elements - # All Concepts map to at least one element - def __init__(self, concept_id, name, desc, concept_type): - self.id = concept_id - self.name = name - self.description = desc - self.type = concept_type - self.concept_action = "" - self.identifiers = {} - self.kg_answers = {} - self.search_terms = [] - self.optional_terms = [] - self.ml_ready_desc = desc - - def add_identifier(self, ident): - if ident.id in self.identifiers: - for search_text in ident.search_text: - self.identifiers[ident.id].add_search_text(search_text) - else: - self.identifiers[ident.id] = ident - - def add_kg_answer(self, answer, query_name): - answer_node_ids = list(answer.nodes.keys()) - answer_id = f'{"_".join(answer_node_ids)}_{query_name}' - if answer_id not in self.kg_answers: - self.kg_answers[answer_id] = answer - - def clean(self): - self.search_terms = list(set(self.search_terms)) - self.optional_terms = list(set(self.optional_terms)) - - def set_search_terms(self): - # Traverse set of identifiers to determine set of search terms - search_terms = self.search_terms - for ident_id, ident in self.identifiers.items(): - search_terms.extend([ident.label, ident.description] + ident.search_text + ident.synonyms) - self.search_terms = list(set(search_terms)) - - def set_optional_terms(self): - # Traverse set of knowledge graph answers to determine set of optional search terms - optional_terms = self.optional_terms - for kg_id, kg_answer in self.kg_answers.items(): - optional_terms += kg_answer.get_node_names() - optional_terms += kg_answer.get_node_synonyms() - self.optional_terms = list(set(optional_terms)) - - def get_searchable_dict(self): - # Translate DugConcept into Elastic-Compatible Concept - es_conc = { - 'id': self.id, - 'name': self.name, - 'description': self.description, - 'type': self.type, - 'search_terms': self.search_terms, - 'optional_terms': self.optional_terms, - 'concept_action': self.concept_action, - 'identifiers': [ident.get_searchable_dict() for ident_id, ident in self.identifiers.items()] - } - return es_conc - - def jsonable(self): - return self.__dict__ - - def __str__(self): - return json.dumps(self.__dict__, indent=2, default=utils.complex_handler) - - -class DbGaPParser: - # Class for parsing DBGaP Data dictionary into a set of Dug Elements - - @staticmethod - def parse(input_file): - logger.debug(input_file) - tree = ET.parse(input_file) - root = tree.getroot() - study_id = root.attrib['study_id'] - participant_set = root.attrib['participant_set'] - - # Parse study name from filehandle - study_name = utils.parse_study_name_from_filename(input_file) - - if study_name is None: - err_msg = f"Unable to parse DbGaP study name from data dictionary: {input_file}!" - logger.error(err_msg) - raise IOError(err_msg) - - elements = [] - for variable in root.iter('variable'): - elem = DugElement(elem_id=f"{variable.attrib['id']}.p{participant_set}", - name=variable.find('name').text, - desc=variable.find('description').text.lower(), - elem_type="DbGaP", - collection_id=f"{study_id}.p{participant_set}", - collection_name=study_name) - - # Create DBGaP links as study/variable actions - elem.collection_action = utils.get_dbgap_study_link(study_id=elem.collection_id) - elem.action = utils.get_dbgap_var_link(study_id=elem.collection_id, - variable_id=elem.id.split(".")[0].split("phv")[1]) - # Add to set of variables - logger.debug(elem) - elements.append(elem) - - # You don't actually create any concepts - return elements - - -class TOPMedTagParser: - - @staticmethod - def parse(input_file: str): - """ - Load tagged variables. - Presumes a harmonized variable list as a CSV file as input. - A naming convention such that _variables_.csv will be the form of the filename. - An adjacent file called _tags_ 0 def test_topmed_tag_parser(): parser = TOPMedTagParser() parse_file = str(TEST_DATA_DIR / "test_variables_v1.0.csv") - elements = parser.parse(parse_file) + elements = parser(parse_file) assert len(elements) == 62 diff --git a/tests/integration/test_search.py b/tests/integration/test_search.py index 1382b805..569db03c 100644 --- a/tests/integration/test_search.py +++ b/tests/integration/test_search.py @@ -3,7 +3,8 @@ import pytest from elasticsearch import Elasticsearch -from dug.core import Search +from dug.core.search import Search +from dug.config import Config def is_elastic_up(): @@ -23,7 +24,7 @@ def is_elastic_up(): http_auth=(username, password) ) return es.ping() - except Exception as _e: + except Exception: return False @@ -32,4 +33,4 @@ def test_search_init(): """ Tests if we can create a Search instance without it blowing up :D """ - Search(host=os.environ.get('ELASTIC_API_HOST'), port=9200,) + Search(cfg=Config.from_env()) diff --git a/tests/unit/test_annotate.py b/tests/unit/test_annotate.py index 8f121b2b..fe60604f 100644 --- a/tests/unit/test_annotate.py +++ b/tests/unit/test_annotate.py @@ -3,8 +3,8 @@ import pytest -from dug import config -from dug.annotate import Identifier, Preprocessor, Annotator, Normalizer, SynonymFinder, OntologyHelper +from dug.config import Config +from dug.core.annotate import Identifier, Preprocessor, Annotator, Normalizer, SynonymFinder, OntologyHelper def test_identifier(): @@ -31,9 +31,10 @@ def test_preprocessor_preprocess(preprocessor, input_text, expected_text): def test_annotator_init(): - url = config.annotator["url"] + cfg = Config.from_env() + url = cfg.annotator["url"] - annotator = Annotator(**config.annotator) + annotator = Annotator(**cfg.annotator) assert annotator.url == url diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 00000000..a1f624a2 --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,17 @@ +import os +from unittest import mock + +from dug.config import Config + + +@mock.patch.dict(os.environ, { + "ELASTIC_PASSWORD": "ohwhoa", + "REDIS_PASSWORD": "thatsprettyneat", + "NBOOST_API_HOST": "gettinboosted!" +}) +def test_config_created_from_env_vars(): + cfg = Config.from_env() + + assert cfg.elastic_password == "ohwhoa" + assert cfg.redis_password == "thatsprettyneat" + assert cfg.nboost_host == "gettinboosted!" diff --git a/tests/unit/test_core/test_search.py b/tests/unit/test_core/test_search.py index c6e21719..b279ad83 100644 --- a/tests/unit/test_core/test_search.py +++ b/tests/unit/test_core/test_search.py @@ -4,7 +4,8 @@ import pytest -from dug.core import Search, SearchException +from dug.core.search import Search, SearchException +from dug.config import Config default_indices = ['concepts_index', 'variables_index', 'kg_index'] @@ -96,26 +97,21 @@ def search(self, index, body, **kwargs): @pytest.fixture def elastic(): - with patch('dug.core.Elasticsearch') as es_class: + with patch('dug.core.search.Elasticsearch') as es_class: es_instance = MockElastic(indices=MockIndices()) es_class.return_value = es_instance yield es_instance def test_init(elastic): + cfg = Config(elastic_host='localhost', + elastic_username='elastic', + elastic_password='hunter2', + nboost_host='localhost') - os.environ['ELASTIC_API_HOST'] = host - os.environ['ELASTIC_USERNAME'] = username - os.environ['ELASTIC_PASSWORD'] = password - os.environ['NBOOST_API_HOST'] = nboost_host - - search = Search() + search = Search(cfg) assert search.indices == default_indices - assert search.host == host - assert search.username == username - assert search.password == password - assert search.nboost_host == nboost_host assert search.hosts == hosts assert search.es is elastic @@ -123,11 +119,11 @@ def test_init(elastic): def test_init_no_ping(elastic): elastic.disconnect() with pytest.raises(SearchException): - _search = Search() + _search = Search(Config.from_env()) def test_init_indices(elastic): - search = Search() + search = Search(Config.from_env()) assert elastic.indices.call_count == 3 # Should take no action if called again @@ -136,7 +132,7 @@ def test_init_indices(elastic): def test_index_doc(elastic: MockElastic): - search = Search() + search = Search(Config.from_env()) assert len(elastic.indices.get_index('concepts_index').values) == 0 search.index_doc('concepts_index', {'name': 'sample'}, "ID:1") @@ -145,7 +141,7 @@ def test_index_doc(elastic: MockElastic): def test_update_doc(elastic: MockElastic): - search = Search() + search = Search(Config.from_env()) search.index_doc('concepts_index', {'name': 'sample'}, "ID:1") search.update_doc('concepts_index', {'name': 'new value!'}, "ID:1") diff --git a/tests/unit/test_parsers.py b/tests/unit/test_parsers.py index ab6dca8e..09e56412 100644 --- a/tests/unit/test_parsers.py +++ b/tests/unit/test_parsers.py @@ -1,5 +1,5 @@ -from dug.annotate import Identifier -from dug.parsers import DugElement, DugConcept +from dug.core.annotate import Identifier +from dug.core.parsers._base import DugElement, DugConcept def test_dug_concept():