diff --git a/data/bdc_dbgap_data_dicts.tar.gz b/data/bdc_dbgap_data_dicts.tar.gz index 7bac5cec..adf46cb3 100644 Binary files a/data/bdc_dbgap_data_dicts.tar.gz and b/data/bdc_dbgap_data_dicts.tar.gz differ diff --git a/docker-compose.yaml b/docker-compose.yaml index 17ca452c..1399e0af 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -29,7 +29,6 @@ services: depends_on: - elasticsearch - redis - - nboost restart: always networks: - dug-network @@ -87,18 +86,6 @@ services: ports: - '6379:6379' - ################################################################################# - ## - ## A scalable, search-engine-boosting platform for developing models to improve - ## search results. - ## - ################################################################################# - nboost: - image: koursaros/nboost:0.3.9-pt - networks: - - dug-network - ports: - - '8000:8000' networks: dug-network: diff --git a/requirements.txt b/requirements.txt index b9573b6a..ccbddaa9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ pyrsistent==0.17.3 pytest pytz==2021.1 PyYAML==6.0 -redis==4.4.2 -requests==2.28.2 +redis==4.4.4 +requests==2.31.0 requests-cache==0.9.8 six==1.16.0 diff --git a/src/dug/_version.py b/src/dug/_version.py index 3749919d..9c7c4147 100644 --- a/src/dug/_version.py +++ b/src/dug/_version.py @@ -1 +1 @@ -__version__ = "2.9.8dev" +__version__ = "2.9.9dev" \ No newline at end of file diff --git a/src/dug/config.py b/src/dug/config.py index 129ba080..6c40e8b2 100644 --- a/src/dug/config.py +++ b/src/dug/config.py @@ -96,9 +96,7 @@ def from_env(cls): "elastic_password": "ELASTIC_PASSWORD", "redis_host": "REDIS_HOST", "redis_port": "REDIS_PORT", - "redis_password": "REDIS_PASSWORD", - "nboost_host": "NBOOST_API_HOST", - "nboost_port": "NBOOST_API_PORT" + "redis_password": "REDIS_PASSWORD" } kwargs = {} @@ -107,5 +105,4 @@ def from_env(cls): env_value = os.environ.get(env_var) if env_value: kwargs[kwarg] = env_value - return cls(**kwargs) diff --git a/src/dug/core/__init__.py b/src/dug/core/__init__.py index 789ff5eb..f1fd8eda 100644 --- a/src/dug/core/__init__.py +++ b/src/dug/core/__init__.py @@ -1,3 +1,4 @@ +import asyncio import logging import os import sys @@ -10,7 +11,6 @@ from dug.core.loaders.network_loader import load_from_network from dug import hookspecs -from dug.config import Config from dug.core import parsers from dug.core.factory import DugFactory from dug.core.parsers import DugConcept, Parser, get_parser @@ -50,6 +50,11 @@ def __init__(self, factory: DugFactory): self._search = self._factory.build_search_obj(indices=[ self.concepts_index, self.variables_index, self.kg_index ]) + self._index = self._factory.build_indexer_obj( + indices=[ + self.concepts_index, self.variables_index, self.kg_index + ] + ) def crawl(self, target_name: str, parser_type: str, element_type: str = None): @@ -71,36 +76,36 @@ def _crawl(self, target: Path, parser: Parser, element_type): for element in crawler.elements: # Only index DugElements as concepts will be indexed differently in next step if not isinstance(element, DugConcept): - self._search.index_element(element, index=self.variables_index) + self._index.index_element(element, index=self.variables_index) # Index Annotated/TranQLized Concepts and associated knowledge graphs for concept_id, concept in crawler.concepts.items(): - self._search.index_concept(concept, index=self.concepts_index) + self._index.index_concept(concept, index=self.concepts_index) # Index knowledge graph answers for each concept for kg_answer_id, kg_answer in concept.kg_answers.items(): - self._search.index_kg_answer(concept_id=concept_id, + self._index.index_kg_answer(concept_id=concept_id, kg_answer=kg_answer, index=self.kg_index, id_suffix=kg_answer_id) def search(self, target, query, **kwargs): + event_loop = asyncio.get_event_loop() targets = { 'concepts': partial( self._search.search_concepts, index=kwargs.get('index', self.concepts_index)), 'variables': partial( self._search.search_variables, index=kwargs.get('index', self.variables_index), concept=kwargs.pop('concept', None)), 'kg': partial( - self._search.search_kg, index=kwargs.get('index', self.kg_index), unique_id=kwargs.pop('unique_id', None)), - 'nboost': partial( - self._search.search_nboost, index=kwargs.get('index', None)), + self._search.search_kg, index=kwargs.get('index', self.kg_index), unique_id=kwargs.pop('unique_id', None)) } kwargs.pop('index', None) func = targets.get(target) if func is None: raise ValueError(f"Target must be one of {', '.join(targets.keys())}") - - return func(query=query, **kwargs) + results = event_loop.run_until_complete(func(query=query, **kwargs)) + event_loop.run_until_complete(self._search.es.close()) + return results def status(self): ... diff --git a/src/dug/core/async_search.py b/src/dug/core/async_search.py index 15c52043..d0140f24 100644 --- a/src/dug/core/async_search.py +++ b/src/dug/core/async_search.py @@ -39,14 +39,14 @@ def __init__(self, cfg: Config, indices=None): logger.debug(f"Authenticating as user {self._cfg.elastic_username} to host:{self.hosts}") self.es = AsyncElasticsearch(hosts=self.hosts, - http_auth=(self._cfg.elastic_username, self._cfg.elastic_password)) + http_auth=(self._cfg.elastic_username, self._cfg.elastic_password)) async def dump_concepts(self, index, query={}, size=None, fuzziness=1, prefix_length=3): """ Get everything from concept index """ query = { - "match_all" : {} + "match_all": {} } body = {"query": query} await self.es.ping() @@ -54,9 +54,9 @@ async def dump_concepts(self, index, query={}, size=None, fuzziness=1, prefix_le counter = 0 all_docs = [] async for doc in async_scan( - client=self.es, - query=body, - index=index + client=self.es, + query=body, + index=index ): if counter == size and size != 0: break @@ -200,9 +200,9 @@ async def search_concepts(self, query, offset=0, size=None, fuzziness=1, prefix_ return search_results async def search_variables(self, concept="", query="", size=None, data_type=None, offset=0, fuzziness=1, - prefix_length=3): + prefix_length=3, index=None): """ - In variable seach, the concept MUST match one of the indentifiers in the list + In variable search, the concept MUST match one of the identifiers in the list The query can match search_terms (hence, "should") for ranking. Results Return @@ -326,9 +326,10 @@ async def search_variables(self, concept="", query="", size=None, data_type=None "identifiers": concept } } - - body = json.dumps({'query': query}) - total_items = await self.es.count(body=body, index="variables_index") + if index is None: + index = "variables_index" + body = {'query': query} + total_items = await self.es.count(body=body, index=index) search_results = await self.es.search( index="variables_index", body=body, @@ -340,9 +341,9 @@ async def search_variables(self, concept="", query="", size=None, data_type=None # Reformat Results new_results = {} if not search_results: - # we don't want to error on a search not found - new_results.update({'total_items': total_items['count']}) - return new_results + # we don't want to error on a search not found + new_results.update({'total_items': total_items['count']}) + return new_results for elem in search_results['hits']['hits']: elem_s = elem['_source'] @@ -360,6 +361,191 @@ async def search_variables(self, concept="", query="", size=None, data_type=None "score": round(elem['_score'], 6) } + # Case: collection not in dictionary for given data_type + if coll_id not in new_results[elem_type]: + # initialize document + doc = { + 'c_id': coll_id, + 'c_link': elem_s['collection_action'], + 'c_name': elem_s['collection_name'], + 'elements': [elem_info] + } + # save document + new_results[elem_type][coll_id] = doc + + # Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate + # elements + else: + new_results[elem_type][coll_id]['elements'].append(elem_info) + + # Flatten dicts to list + for i in new_results: + new_results[i] = list(new_results[i].values()) + + # Return results + if bool(data_type): + if data_type in new_results: + new_results = new_results[data_type] + else: + new_results = {} + return new_results + + async def search_vars_unscored(self, concept="", query="", size=None, data_type=None, offset=0, fuzziness=1, + prefix_length=3): + """ + In variable search, the concept MUST match one of the identifiers in the list + The query can match search_terms (hence, "should") for ranking. + + Results Return + The search result is returned in JSON format {collection_id:[elements]} + + Filter + If a data_type is passed in, the result will be filtered to only contain + the passed-in data type. + """ + query = { + 'bool': { + 'should': { + "match": { + "identifiers": concept + } + }, + 'should': [ + { + "match_phrase": { + "element_name": { + "query": query, + "boost": 10 + } + } + }, + { + "match_phrase": { + "element_desc": { + "query": query, + "boost": 6 + } + } + }, + { + "match_phrase": { + "search_terms": { + "query": query, + "boost": 8 + } + } + }, + { + "match": { + "element_name": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "operator": "and", + "boost": 4 + } + } + }, + { + "match": { + "search_terms": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "operator": "and", + "boost": 5 + } + } + }, + { + "match": { + "element_desc": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "operator": "and", + "boost": 3 + } + } + }, + { + "match": { + "element_desc": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "boost": 2 + } + } + }, + { + "match": { + "element_name": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "boost": 2 + } + } + }, + { + "match": { + "search_terms": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length, + "boost": 1 + } + } + }, + { + "match": { + "optional_terms": { + "query": query, + "fuzziness": fuzziness, + "prefix_length": prefix_length + } + } + } + ] + } + } + + if concept: + query['bool']['must'] = { + "match": { + "identifiers": concept + } + } + + body = {'query': query} + total_items = await self.es.count(body=body, index="variables_index") + search_results = [] + async for r in async_scan(self.es, + query=body): + search_results.append(r) + # Reformat Results + new_results = {} + if not search_results: + # we don't want to error on a search not found + new_results.update({'total_items': total_items['count']}) + return new_results + + for elem in search_results: + elem_s = elem['_source'] + elem_type = elem_s['data_type'] + if elem_type not in new_results: + new_results[elem_type] = {} + + elem_id = elem_s['element_id'] + coll_id = elem_s['collection_id'] + elem_info = { + "description": elem_s['element_desc'], + "e_link": elem_s['element_action'], + "id": elem_id, + "name": elem_s['element_name'] + } + # Case: collection not in dictionary for given data_type if coll_id not in new_results[elem_type]: # initialize document @@ -413,7 +599,7 @@ async def search_kg(self, unique_id, query, offset=0, size=None, fuzziness=1, pr ] } } - body = json.dumps({'query': query}) + body = {'query': query} total_items = await self.es.count(body=body, index="kg_index") search_results = await self.es.search( index="kg_index", @@ -424,4 +610,3 @@ async def search_kg(self, unique_id, query, offset=0, size=None, fuzziness=1, pr ) search_results.update({'total_items': total_items['count']}) return search_results - diff --git a/src/dug/core/factory.py b/src/dug/core/factory.py index 3c2e16d2..48097f19 100644 --- a/src/dug/core/factory.py +++ b/src/dug/core/factory.py @@ -9,7 +9,8 @@ from dug.config import Config as DugConfig, TRANQL_SOURCE from dug.core.crawler import Crawler from dug.core.parsers import Parser -from dug.core.search import Search +from dug.core.async_search import Search +from dug.core.index import Index class DugFactory: @@ -80,6 +81,9 @@ def build_tranql_queries(self, source=None) -> Dict[str, tql.QueryFactory]: def build_search_obj(self, indices) -> Search: return Search(self.config, indices=indices) + def build_indexer_obj(self, indices) -> Index: + return Index(self.config, indices=indices) + def build_element_extraction_parameters(self, source=None): # Method reformats the node_to_element_queries object # Uses tranql source use for concept crawling diff --git a/src/dug/core/index.py b/src/dug/core/index.py new file mode 100644 index 00000000..a6711a15 --- /dev/null +++ b/src/dug/core/index.py @@ -0,0 +1,227 @@ +""" +This class is used for adding documents to elastic search index +""" +import logging + +from elasticsearch import Elasticsearch + +from dug.config import Config + +logger = logging.getLogger('dug') + + +class Index: + def __init__(self, cfg: Config, indices=None): + + if indices is None: + indices = ['concepts_index', 'variables_index', 'kg_index'] + + self._cfg = cfg + logger.debug(f"Connecting to elasticsearch host: {self._cfg.elastic_host} at port: {self._cfg.elastic_port}") + + self.indices = indices + self.hosts = [{'host': self._cfg.elastic_host, 'port': self._cfg.elastic_port}] + + logger.debug(f"Authenticating as user {self._cfg.elastic_username} to host:{self.hosts}") + + self.es = Elasticsearch(hosts=self.hosts, + http_auth=(self._cfg.elastic_username, self._cfg.elastic_password)) + + if self.es.ping(): + logger.info('connected to elasticsearch') + self.init_indices() + else: + print(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") + logger.error(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") + raise SearchException( + message='failed to connect to elasticsearch', + details=f"connecting to host {self._cfg.elastic_host} and port {self._cfg.elastic_port}") + + def init_indices(self): + # The concepts and variable indices include an analyzer that utilizes the english + # stopword facility from elastic search. We also instruct each of the text mappings + # to use this analyzer. Note that we have not upgraded the kg index, because the fields + # in that index are primarily dynamic. We could eventually either add mappings so that + # the fields are no longer dynamic or we could use the dynamic template capabilities + # described in + # https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html + + kg_index = { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "type": { + "type": "text" + } + } + } + } + concepts_index = { + "settings": { + "index.mapping.coerce": "false", + "number_of_shards": 1, + "number_of_replicas": 0, + "analysis": { + "analyzer": { + "std_with_stopwords": { + "type": "standard", + "stopwords": "_english_" + } + } + } + }, + "mappings": { + "dynamic": "strict", + "properties": { + "id": {"type": "text", "analyzer": "std_with_stopwords", + "fields": {"keyword": {"type": "keyword"}}}, + "name": {"type": "text", "analyzer": "std_with_stopwords"}, + "description": {"type": "text", "analyzer": "std_with_stopwords"}, + "type": {"type": "keyword"}, + "search_terms": {"type": "text", "analyzer": "std_with_stopwords"}, + "identifiers": { + "properties": { + "id": {"type": "text", "analyzer": "std_with_stopwords", + "fields": {"keyword": {"type": "keyword"}}}, + "label": {"type": "text", "analyzer": "std_with_stopwords"}, + "equivalent_identifiers": {"type": "keyword"}, + "type": {"type": "keyword"}, + "synonyms": {"type": "text", "analyzer": "std_with_stopwords"} + } + }, + "optional_terms": {"type": "text", "analyzer": "std_with_stopwords"}, + "concept_action": {"type": "text", "analyzer": "std_with_stopwords"} + } + } + } + variables_index = { + "settings": { + "index.mapping.coerce": "false", + "number_of_shards": 1, + "number_of_replicas": 0, + "analysis": { + "analyzer": { + "std_with_stopwords": { + "type": "standard", + "stopwords": "_english_" + } + } + } + }, + "mappings": { + "dynamic": "strict", + "properties": { + "element_id": {"type": "text", "analyzer": "std_with_stopwords", + "fields": {"keyword": {"type": "keyword"}}}, + "element_name": {"type": "text", "analyzer": "std_with_stopwords"}, + "element_desc": {"type": "text", "analyzer": "std_with_stopwords"}, + "element_action": {"type": "text", "analyzer": "std_with_stopwords"}, + "search_terms": {"type": "text", "analyzer": "std_with_stopwords"}, + "optional_terms": {"type": "text", "analyzer": "std_with_stopwords"}, + "identifiers": {"type": "keyword"}, + "collection_id": {"type": "text", "analyzer": "std_with_stopwords", + "fields": {"keyword": {"type": "keyword"}}}, + "collection_name": {"type": "text", "analyzer": "std_with_stopwords"}, + "collection_desc": {"type": "text", "analyzer": "std_with_stopwords"}, + "collection_action": {"type": "text", "analyzer": "std_with_stopwords"}, + "data_type": {"type": "text", "analyzer": "std_with_stopwords", + "fields": {"keyword": {"type": "keyword"}}} + # typed as keyword for bucket aggs + } + } + } + + settings = { + 'kg_index': kg_index, + 'concepts_index': concepts_index, + 'variables_index': variables_index, + } + + logger.info(f"creating indices") + logger.debug(self.indices) + for index in self.indices: + try: + if self.es.indices.exists(index=index): + logger.info(f"Ignoring index {index} which already exists.") + else: + result = self.es.indices.create( + index=index, + body=settings[index], + ignore=400) + logger.info(f"result created index {index}: {result}") + except Exception as e: + logger.error(f"exception: {e}") + raise e + + def index_doc(self, index, doc, doc_id): + self.es.index( + index=index, + id=doc_id, + body=doc) + + def update_doc(self, index, doc, doc_id): + self.es.update( + index=index, + id=doc_id, + body=doc + ) + + def index_concept(self, concept, index): + # Don't re-index if already in index + if self.es.exists(index, concept.id): + return + """ Index the document. """ + self.index_doc( + index=index, + doc=concept.get_searchable_dict(), + doc_id=concept.id) + + def index_element(self, elem, index): + if not self.es.exists(index, elem.id): + # If the element doesn't exist, add it directly + self.index_doc( + index=index, + doc=elem.get_searchable_dict(), + doc_id=elem.id) + else: + # Otherwise update to add any new identifiers that weren't there last time around + results = self.es.get(index, elem.id) + identifiers = results['_source']['identifiers'] + list(elem.concepts.keys()) + doc = {"doc": {}} + doc['doc']['identifiers'] = list(set(identifiers)) + self.update_doc(index=index, doc=doc, doc_id=elem.id) + + def index_kg_answer(self, concept_id, kg_answer, index, id_suffix=None): + + # Get search targets by extracting names/synonyms from non-curie nodes in answer knoweldge graph + search_targets = kg_answer.get_node_names(include_curie=False) + search_targets += kg_answer.get_node_synonyms(include_curie=False) + + # Create the Doc + doc = { + 'concept_id': concept_id, + 'search_targets': list(set(search_targets)), + 'knowledge_graph': kg_answer.get_kg() + } + + # Create unique ID + logger.debug("Indexing TranQL query answer...") + id_suffix = list(kg_answer.nodes.keys()) if id_suffix is None else id_suffix + unique_doc_id = f"{concept_id}_{id_suffix}" + + """ Index the document. """ + self.index_doc( + index=index, + doc=doc, + doc_id=unique_doc_id) + +class SearchException(Exception): + def __init__(self, message, details): + self.message = message + self.details = details \ No newline at end of file diff --git a/src/dug/core/search.py b/src/dug/core/search.py deleted file mode 100644 index 26a7035f..00000000 --- a/src/dug/core/search.py +++ /dev/null @@ -1,645 +0,0 @@ -### -# Deprication Notice: -# New Changes to search and indexing should be made in the async flavor of dug. -# see : ./async_search.py -### - -import json -import logging - -import requests -from elasticsearch import Elasticsearch - -from dug.config import Config - -logger = logging.getLogger('dug') - - -class Search: - """ Search - - 1. Lexical fuzziness; (a) misspellings - a function of elastic. - 2. Fuzzy ontologically; - (a) expand based on core queries - * phenotype->study - * phenotype->disease->study - * disease->study - * disease->phenotype->study - """ - - def __init__(self, cfg: Config, indices=None): - - if indices is None: - indices = ['concepts_index', 'variables_index', 'kg_index'] - - self._cfg = cfg - logger.debug(f"Connecting to elasticsearch host: {self._cfg.elastic_host} at port: {self._cfg.elastic_port}") - - self.indices = indices - self.hosts = [{'host': self._cfg.elastic_host, 'port': self._cfg.elastic_port}] - - logger.debug(f"Authenticating as user {self._cfg.elastic_username} to host:{self.hosts}") - - self.es = Elasticsearch(hosts=self.hosts, - http_auth=(self._cfg.elastic_username, self._cfg.elastic_password)) - - if self.es.ping(): - logger.info('connected to elasticsearch') - self.init_indices() - else: - print(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") - logger.error(f"Unable to connect to elasticsearch at {self._cfg.elastic_host}:{self._cfg.elastic_port}") - raise SearchException( - message='failed to connect to elasticsearch', - details=f"connecting to host {self._cfg.elastic_host} and port {self._cfg.elastic_port}") - - def init_indices(self): - # The concepts and variable indices include an analyzer that utilizes the english - # stopword facility from elastic search. We also instruct each of the text mappings - # to use this analyzer. Note that we have not upgraded the kg index, because the fields - # in that index are primarily dynamic. We could eventually either add mappings so that - # the fields are no longer dynamic or we could use the dynamic template capabilities - # described in - # https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-templates.html - - kg_index = { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "properties": { - "name": { - "type": "text" - }, - "type": { - "type": "text" - } - } - } - } - concepts_index = { - "settings": { - "index.mapping.coerce": "false", - "number_of_shards": 1, - "number_of_replicas": 0, - "analysis": { - "analyzer": { - "std_with_stopwords": { - "type": "standard", - "stopwords": "_english_" - } - } - } - }, - "mappings": { - "dynamic": "strict", - "properties": { - "id": {"type": "text", "analyzer": "std_with_stopwords", "fields": {"keyword": {"type": "keyword"}}}, - "name": {"type": "text", "analyzer": "std_with_stopwords"}, - "description": {"type": "text", "analyzer": "std_with_stopwords"}, - "type": {"type": "keyword"}, - "search_terms": {"type": "text", "analyzer": "std_with_stopwords"}, - "identifiers": { - "properties": { - "id": {"type": "text", "analyzer": "std_with_stopwords", "fields": {"keyword": {"type": "keyword"}}}, - "label": {"type": "text", "analyzer": "std_with_stopwords"}, - "equivalent_identifiers": {"type": "keyword"}, - "type": {"type": "keyword"}, - "synonyms": {"type": "text", "analyzer": "std_with_stopwords"} - } - }, - "optional_terms": {"type": "text", "analyzer": "std_with_stopwords"}, - "concept_action": {"type": "text", "analyzer": "std_with_stopwords"} - } - } - } - variables_index = { - "settings": { - "index.mapping.coerce": "false", - "number_of_shards": 1, - "number_of_replicas": 0, - "analysis": { - "analyzer": { - "std_with_stopwords": { - "type": "standard", - "stopwords": "_english_" - } - } - } - }, - "mappings": { - "dynamic": "strict", - "properties": { - "element_id": {"type": "text", "analyzer": "std_with_stopwords", "fields": {"keyword": {"type": "keyword"}}}, - "element_name": {"type": "text", "analyzer": "std_with_stopwords"}, - "element_desc": {"type": "text", "analyzer": "std_with_stopwords"}, - "element_action": {"type": "text", "analyzer": "std_with_stopwords"}, - "search_terms": {"type": "text", "analyzer": "std_with_stopwords"}, - "optional_terms": {"type": "text", "analyzer": "std_with_stopwords"}, - "identifiers": {"type": "keyword"}, - "collection_id": {"type": "text", "analyzer": "std_with_stopwords", "fields": {"keyword": {"type": "keyword"}}}, - "collection_name": {"type": "text", "analyzer": "std_with_stopwords"}, - "collection_desc": {"type": "text", "analyzer": "std_with_stopwords"}, - "collection_action": {"type": "text", "analyzer": "std_with_stopwords"}, - "data_type": {"type": "text", "analyzer": "std_with_stopwords", "fields": {"keyword": {"type": "keyword"}}} - # typed as keyword for bucket aggs - } - } - } - - settings = { - 'kg_index': kg_index, - 'concepts_index': concepts_index, - 'variables_index': variables_index, - } - - logger.info(f"creating indices") - logger.debug(self.indices) - for index in self.indices: - try: - if self.es.indices.exists(index=index): - logger.info(f"Ignoring index {index} which already exists.") - else: - result = self.es.indices.create( - index=index, - body=settings[index], - ignore=400) - logger.info(f"result created index {index}: {result}") - except Exception as e: - logger.error(f"exception: {e}") - raise e - - def index_doc(self, index, doc, doc_id): - self.es.index( - index=index, - id=doc_id, - body=doc) - - def update_doc(self, index, doc, doc_id): - self.es.update( - index=index, - id=doc_id, - body=doc - ) - - def dump_concepts(self, index, query={}, offset=0, size=None, fuzziness=1, prefix_length=3): - """ - Get everything from concept index - """ - query = { - "match_all" : {} - } - - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - search_results.update({'total_items': total_items['count']}) - return search_results - - def search_concepts(self, index, query, offset=0, size=None, fuzziness=1, prefix_length=3): - """ - Changed to a long boolean match query to optimize search results - """ - query = { - "bool": { - "should": [ - { - "match_phrase": { - "name": { - "query": query, - "boost": 10 - } - } - }, - { - "match_phrase": { - "description": { - "query": query, - "boost": 6 - } - } - }, - { - "match_phrase": { - "search_terms": { - "query": query, - "boost": 8 - } - } - }, - { - "match": { - "name": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 4 - } - } - }, - { - "match": { - "search_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 5 - } - } - }, - { - "match": { - "description": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 3 - } - } - }, - { - "match": { - "description": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "boost": 2 - } - } - }, - { - "match": { - "search_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "boost": 1 - } - } - }, - { - "match": { - "optional_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length - } - } - } - ] - } - } - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - search_results.update({'total_items': total_items['count']}) - return search_results - - def search_variables(self, index, concept="", query="", size=None, data_type=None, offset=0, fuzziness=1, - prefix_length=3): - """ - In variable seach, the concept MUST match one of the indentifiers in the list - The query can match search_terms (hence, "should") for ranking. - - Results Return - The search result is returned in JSON format {collection_id:[elements]} - - Filter - If a data_type is passed in, the result will be filtered to only contain - the passed-in data type. - """ - query = { - 'bool': { - 'should': { - "match": { - "identifiers": concept - } - }, - 'should': [ - { - "match_phrase": { - "element_name": { - "query": query, - "boost": 10 - } - } - }, - { - "match_phrase": { - "element_desc": { - "query": query, - "boost": 6 - } - } - }, - { - "match_phrase": { - "search_terms": { - "query": query, - "boost": 8 - } - } - }, - { - "match": { - "element_name": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 4 - } - } - }, - { - "match": { - "search_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 5 - } - } - }, - { - "match": { - "element_desc": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "operator": "and", - "boost": 3 - } - } - }, - { - "match": { - "element_desc": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "boost": 2 - } - } - }, - { - "match": { - "element_name": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "boost": 2 - } - } - }, - { - "match": { - "search_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length, - "boost": 1 - } - } - }, - { - "match": { - "optional_terms": { - "query": query, - "fuzziness": fuzziness, - "prefix_length": prefix_length - } - } - } - ] - } - } - - if concept: - query['bool']['must'] = { - "match": { - "identifiers": concept - } - } - - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source', 'hits.hits._score'], - from_=offset, - size=size - ) - - # Reformat Results - new_results = {} - if not search_results: - # we don't want to error on a search not found - new_results.update({'total_items': total_items['count']}) - return new_results - - for elem in search_results['hits']['hits']: - elem_s = elem['_source'] - elem_type = elem_s['data_type'] - if elem_type not in new_results: - new_results[elem_type] = {} - - elem_id = elem_s['element_id'] - coll_id = elem_s['collection_id'] - elem_info = { - "description": elem_s['element_desc'], - "e_link": elem_s['element_action'], - "id": elem_id, - "name": elem_s['element_name'], - "score": round(elem['_score'], 6) - } - - # Case: collection not in dictionary for given data_type - if coll_id not in new_results[elem_type]: - # initialize document - doc = {} - - # add information - doc['c_id'] = coll_id - doc['c_link'] = elem_s['collection_action'] - doc['c_name'] = elem_s['collection_name'] - doc['elements'] = [elem_info] - - # save document - new_results[elem_type][coll_id] = doc - - # Case: collection already in dictionary for given element_type; append elem_info. Assumes no duplicate elements - else: - new_results[elem_type][coll_id]['elements'].append(elem_info) - - # Flatten dicts to list - for i in new_results: - new_results[i] = list(new_results[i].values()) - - # Return results - if bool(data_type): - if data_type in new_results: - new_results = new_results[data_type] - else: - new_results = {} - return new_results - - def agg_data_type(self, index, size=0): - """ - In variable seach, the concept MUST match one of the indentifiers in the list - The query can match search_terms (hence, "should") for ranking. - """ - aggs = { - "data_type": { - "terms": { - "field": "data_type.keyword", - "size": 100 - } - } - } - body = json.dumps({'aggs': aggs}) - - search_results = self.es.search( - index=index, - body=body, - size=size - ) - data_type_list = [data_type['key'] for data_type in search_results['aggregations']['data_type']['buckets']] - search_results.update({'data type list': data_type_list}) - return data_type_list - - def search_kg(self, index, unique_id, query, offset=0, size=None, fuzziness=1, prefix_length=3): - """ - In knowledge graph search seach, the concept MUST match the unique ID - The query MUST match search_targets. The updated query allows for - fuzzy matching and for the default OR behavior for the query. - """ - query = { - "bool": { - "must": [ - {"term": { - "concept_id.keyword": unique_id - } - }, - {'query_string': { - "query": query, - "fuzziness": fuzziness, - "fuzzy_prefix_length": prefix_length, - "default_field": "search_targets" - } - } - ] - } - } - body = json.dumps({'query': query}) - total_items = self.es.count(body=body, index=index) - search_results = self.es.search( - index=index, - body=body, - filter_path=['hits.hits._id', 'hits.hits._type', 'hits.hits._source'], - from_=offset, - size=size - ) - search_results.update({'total_items': total_items['count']}) - return search_results - - def search_nboost(self, index, query, offset=0, size=10, fuzziness=1): - """ - Query type is now 'query_string'. - query searches multiple fields - if search terms are surrounded in quotes, looks for exact matches in any of the fields - AND/OR operators are natively supported by elasticesarch queries - """ - nboost_query = { - 'nboost': { - 'uhost': f"{self._cfg.elastic_username}:{self._cfg.elastic_password}@{self._cfg.elastic_host}", - 'uport': self._cfg.elastic_port, - 'cvalues_path': '_source.description', - 'query_path': 'body.query.query_string.query', - 'size': size, - 'from': offset, - 'default_topk': size - }, - 'query': { - 'query_string': { - 'query': query, - 'fuzziness': fuzziness, - 'fields': ['name', 'description', 'instructions', 'search_targets', 'optional_targets'], - 'quote_field_suffix': ".exact" - } - } - } - - return requests.post(url=f"http://{self._cfg.nboost_host}:{self._cfg.nboost_port}/{index}/_search", json=nboost_query).json() - - def index_concept(self, concept, index): - # Don't re-index if already in index - if self.es.exists(index, concept.id): - return - """ Index the document. """ - self.index_doc( - index=index, - doc=concept.get_searchable_dict(), - doc_id=concept.id) - - def index_element(self, elem, index): - if not self.es.exists(index, elem.id): - # If the element doesn't exist, add it directly - self.index_doc( - index=index, - doc=elem.get_searchable_dict(), - doc_id=elem.id) - else: - # Otherwise update to add any new identifiers that weren't there last time around - results = self.es.get(index, elem.id) - identifiers = results['_source']['identifiers'] + list(elem.concepts.keys()) - doc = {"doc": {}} - doc['doc']['identifiers'] = list(set(identifiers)) - self.update_doc(index=index, doc=doc, doc_id=elem.id) - - def index_kg_answer(self, concept_id, kg_answer, index, id_suffix=None): - - # Get search targets by extracting names/synonyms from non-curie nodes in answer knoweldge graph - search_targets = kg_answer.get_node_names(include_curie=False) - search_targets += kg_answer.get_node_synonyms(include_curie=False) - - # Create the Doc - doc = { - 'concept_id': concept_id, - 'search_targets': list(set(search_targets)), - 'knowledge_graph': kg_answer.get_kg() - } - - # Create unique ID - logger.debug("Indexing TranQL query answer...") - id_suffix = list(kg_answer.nodes.keys()) if id_suffix is None else id_suffix - unique_doc_id = f"{concept_id}_{id_suffix}" - - """ Index the document. """ - self.index_doc( - index=index, - doc=doc, - doc_id=unique_doc_id) - - -class SearchException(Exception): - def __init__(self, message, details): - self.message = message - self.details = details diff --git a/tests/integration/test_search.py b/tests/integration/test_index.py similarity index 94% rename from tests/integration/test_search.py rename to tests/integration/test_index.py index 569db03c..31d0d3db 100644 --- a/tests/integration/test_search.py +++ b/tests/integration/test_index.py @@ -3,7 +3,7 @@ import pytest from elasticsearch import Elasticsearch -from dug.core.search import Search +from dug.core.async_search import Search from dug.config import Config diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index a1f624a2..dd54bb2d 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -6,12 +6,10 @@ @mock.patch.dict(os.environ, { "ELASTIC_PASSWORD": "ohwhoa", - "REDIS_PASSWORD": "thatsprettyneat", - "NBOOST_API_HOST": "gettinboosted!" + "REDIS_PASSWORD": "thatsprettyneat" }) def test_config_created_from_env_vars(): cfg = Config.from_env() assert cfg.elastic_password == "ohwhoa" assert cfg.redis_password == "thatsprettyneat" - assert cfg.nboost_host == "gettinboosted!" diff --git a/tests/unit/test_core/test_search.py b/tests/unit/test_core/test_search.py index b279ad83..bf511e2c 100644 --- a/tests/unit/test_core/test_search.py +++ b/tests/unit/test_core/test_search.py @@ -4,7 +4,7 @@ import pytest -from dug.core.search import Search, SearchException +from dug.core.index import Index, SearchException from dug.config import Config default_indices = ['concepts_index', 'variables_index', 'kg_index'] @@ -97,7 +97,7 @@ def search(self, index, body, **kwargs): @pytest.fixture def elastic(): - with patch('dug.core.search.Elasticsearch') as es_class: + with patch('dug.core.index.Elasticsearch') as es_class: es_instance = MockElastic(indices=MockIndices()) es_class.return_value = es_instance yield es_instance @@ -109,7 +109,7 @@ def test_init(elastic): elastic_password='hunter2', nboost_host='localhost') - search = Search(cfg) + search = Index(cfg) assert search.indices == default_indices assert search.hosts == hosts @@ -119,11 +119,11 @@ def test_init(elastic): def test_init_no_ping(elastic): elastic.disconnect() with pytest.raises(SearchException): - _search = Search(Config.from_env()) + _search = Index(Config.from_env()) - -def test_init_indices(elastic): - search = Search(Config.from_env()) +@pytest.mark.asyncio +async def test_init_indices(elastic): + search = Index(Config.from_env()) assert elastic.indices.call_count == 3 # Should take no action if called again @@ -132,7 +132,7 @@ def test_init_indices(elastic): def test_index_doc(elastic: MockElastic): - search = Search(Config.from_env()) + search = Index(Config.from_env()) assert len(elastic.indices.get_index('concepts_index').values) == 0 search.index_doc('concepts_index', {'name': 'sample'}, "ID:1") @@ -141,7 +141,7 @@ def test_index_doc(elastic: MockElastic): def test_update_doc(elastic: MockElastic): - search = Search(Config.from_env()) + search = Index(Config.from_env()) search.index_doc('concepts_index', {'name': 'sample'}, "ID:1") search.update_doc('concepts_index', {'name': 'new value!'}, "ID:1")