From a29f377e8959df2fc9fc53f1a2cc45a1afaa91de Mon Sep 17 00:00:00 2001 From: Kristoffer Andersson Date: Tue, 20 Dec 2022 21:15:00 +0100 Subject: [PATCH 1/5] refactor: rename config_handler --- .../elasticsearch6/{config_handler.py => es_mapping_repo.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename karp/search_infrastructure/elasticsearch6/{config_handler.py => es_mapping_repo.py} (100%) diff --git a/karp/search_infrastructure/elasticsearch6/config_handler.py b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py similarity index 100% rename from karp/search_infrastructure/elasticsearch6/config_handler.py rename to karp/search_infrastructure/elasticsearch6/es_mapping_repo.py From 4f217bae1d0f422fd0322f4d336bd0ebce8f3ed7 Mon Sep 17 00:00:00 2001 From: Kristoffer Andersson Date: Thu, 22 Dec 2022 08:14:43 +0100 Subject: [PATCH 2/5] fix: add es6 mapping repo --- karp/search_infrastructure/__init__.py | 16 +++++- .../elasticsearch6/__init__.py | 17 +----- .../elasticsearch6/es_mapping_repo.py | 52 +++++++++++-------- .../queries/es6_search_service.py | 8 ++- .../repositories/es6_indicies.py | 17 +++++- 5 files changed, 70 insertions(+), 40 deletions(-) diff --git a/karp/search_infrastructure/__init__.py b/karp/search_infrastructure/__init__.py index db86d4f2..4401d2b8 100644 --- a/karp/search_infrastructure/__init__.py +++ b/karp/search_infrastructure/__init__.py @@ -41,6 +41,7 @@ NoOpIndexUnitOfWork, Es6IndexUnitOfWork, ) +from karp.search_infrastructure.elasticsearch6 import Es6MappingRepository logger = logging.getLogger(__name__) @@ -116,21 +117,34 @@ class Es6SearchIndexMod(injector.Module): def __init__(self, index_prefix: Optional[str] = None) -> None: self._index_prefix = index_prefix or "" + @injector.provider + def es6_mapping_repo( + self, + es: elasticsearch.Elasticsearch, + ) -> Es6MappingRepository: + return Es6MappingRepository(es=es) + @injector.provider def es6_search_service( self, es: elasticsearch.Elasticsearch, + mapping_repo: Es6MappingRepository, ) -> SearchService: - return Es6SearchService(es=es) + return Es6SearchService( + es=es, + mapping_repo=mapping_repo, + ) @injector.provider def es6_index_uow( self, es: elasticsearch.Elasticsearch, event_bus: EventBus, + mapping_repo: Es6MappingRepository, ) -> IndexUnitOfWork: return Es6IndexUnitOfWork( es=es, event_bus=event_bus, index_prefix=self._index_prefix, + mapping_repo=mapping_repo, ) diff --git a/karp/search_infrastructure/elasticsearch6/__init__.py b/karp/search_infrastructure/elasticsearch6/__init__.py index 363d510e..69578ba9 100644 --- a/karp/search_infrastructure/elasticsearch6/__init__.py +++ b/karp/search_infrastructure/elasticsearch6/__init__.py @@ -1,17 +1,4 @@ -# from elasticsearch import Elasticsearch # pyre-ignore +from karp.search_infrastructure.elasticsearch6.es_mapping_repo import Es6MappingRepository -# from karp.application import ctx -# from karp.infrastructure.elasticsearch6.es6_search_service import Es6SearchService - -# def init_es(host): -# print("Setting up ES with host={}".format(host)) -# es = Elasticsearch( -# hosts=host, -# sniff_on_start=True, -# sniff_on_connection_fail=True, -# sniffer_timeout=60, -# sniff_timeout=10, -# ) -# search_service = Es6SearchService(es) -# ctx.search_service = search_service +__all__ = ["Es6MappingRepository"] \ No newline at end of file diff --git a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py index 02de2b1e..63932613 100644 --- a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py +++ b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py @@ -23,22 +23,28 @@ KARP_CONFIGINDEX_TYPE = "configs" -class ConfigRepository(abc.ABC): +class MappingRepository(abc.ABC): pass -class Es6ConfigRepository(Index): - def __init__(self, es: elasticsearch.Elasticsearch): - self.es: elasticsearch.Elasticsearch = es +class Es6MappingRepository(MappingRepository): + def __init__( + self, + es: elasticsearch.Elasticsearch, + prefix: str = "", + ): + self.es = es + self._prefix = prefix + self._config_index = f"{prefix}_config" if prefix else KARP_CONFIGINDEX self.ensure_config_index_exist() analyzed_fields, sortable_fields = self._init_field_mapping() self.analyzed_fields: Dict[str, List[str]] = analyzed_fields self.sortable_fields: Dict[str, Dict[str, List[str]]] = sortable_fields def ensure_config_index_exist(self) -> None: - if not self.es.indices.exists(index=KARP_CONFIGINDEX): + if not self.es.indices.exists(index=self._config_index): self.es.indices.create( - index=KARP_CONFIGINDEX, + index=self._config_index, body={ "settings": { "number_of_shards": 1, @@ -48,17 +54,16 @@ def ensure_config_index_exist(self) -> None: "mappings": { KARP_CONFIGINDEX_TYPE: { "dynamic": False, - "properties": {"index_name": {"type": "text"}}, + "properties": { + "index_name": {"type": "text"}, + "alias_name": {"type": "text"} + }, } }, }, ) - @property - def seen(self): - return [] - - def create_index(self, resource_id, config): + def create_index(self, resource_id: str, config): print("creating es mapping ...") mapping = create_es6_mapping(config) @@ -78,9 +83,7 @@ def create_index(self, resource_id, config): "mappings": {"entry": mapping}, } - date = datetime.now().strftime("%Y-%m-%d-%H%M%S%f") - index_name = resource_id + "_" + date - print(f"creating index '{index_name}' ...") + logger.debug(f"creating index '{index_name}' ...") result = self.es.indices.create(index=index_name, body=body) if "error" in result: print("failed to create index") @@ -89,25 +92,32 @@ def create_index(self, resource_id, config): self._set_index_name_for_resource(resource_id, index_name) return index_name - def _set_index_name_for_resource(self, resource_id: str, index_name: str) -> str: + def create_index_name(self, resource_id: str) -> str: + date = datetime.now().strftime("%Y-%m-%d-%H%M%S%f") + return f"{self._prefix}{resource_id}_{date}" + + def create_alias_name(self, resource_id: str) -> str: + return f"{self._prefix}{resource_id}" + + def set_index_name_for_resource(self, resource_id: str, index_name: str) -> str: self.es.index( - index=KARP_CONFIGINDEX, + index=self._config_index, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE, body={"index_name": index_name}, ) return index_name - def _get_index_name_for_resource(self, resource_id: str) -> str: + def get_index_name_for_resource(self, resource_id: str) -> str: try: res = self.es.get( - index=KARP_CONFIGINDEX, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE + index=self._config_index, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE ) except es_exceptions.NotFoundError as err: - logger.debug( + logger.error( "didn't find index_name for resource '%s' details: %s", resource_id, err ) - return self._set_index_name_for_resource(resource_id, resource_id) + return self.set_index_name_for_resource(resource_id, self.create_index_name(resource_id)) return res["_source"]["index_name"] def publish_index(self, resource_id: str): diff --git a/karp/search_infrastructure/queries/es6_search_service.py b/karp/search_infrastructure/queries/es6_search_service.py index d476d21b..dab3f67f 100644 --- a/karp/search_infrastructure/queries/es6_search_service.py +++ b/karp/search_infrastructure/queries/es6_search_service.py @@ -23,6 +23,7 @@ from karp.lex.domain.entities.resource import Resource from karp.search.domain import query_dsl from karp.search_infrastructure.elasticsearch6 import es_config +from karp.search_infrastructure.elasticsearch6 import Es6MappingRepository from .es_query import EsQuery # from karp.query_dsl import basic_ast as ast, op, is_a @@ -118,8 +119,13 @@ def walk__startswith(self, node): class Es6SearchService(search.SearchService): - def __init__(self, es: elasticsearch.Elasticsearch): + def __init__( + self, + es: elasticsearch.Elasticsearch, + mapping_repo: Es6MappingRepository, + ): self.es: elasticsearch.Elasticsearch = es + self.mapping_repo = mapping_repo self.query_builder = EsQueryBuilder() self.parser = query_dsl.KarpQueryV6Parser( semantics=query_dsl.KarpQueryV6ModelBuilderSemantics() diff --git a/karp/search_infrastructure/repositories/es6_indicies.py b/karp/search_infrastructure/repositories/es6_indicies.py index 619f7978..f8889187 100644 --- a/karp/search_infrastructure/repositories/es6_indicies.py +++ b/karp/search_infrastructure/repositories/es6_indicies.py @@ -15,6 +15,7 @@ IndexUnitOfWork, ) from karp.search.domain.errors import UnsupportedField +from karp.search_infrastructure.elasticsearch6 import Es6MappingRepository logger = logging.getLogger(__name__) @@ -24,7 +25,13 @@ class Es6Index(Index): - def __init__(self, es: elasticsearch.Elasticsearch, *, index_prefix: str = ""): + def __init__( + self, + es: elasticsearch.Elasticsearch, + mapping_repo: Es6MappingRepository, + *, + index_prefix: str = "" + ): self.es: elasticsearch.Elasticsearch = es self.index_prefix = index_prefix if not self.es.indices.exists(index=KARP_CONFIGINDEX): @@ -471,9 +478,15 @@ def __init__( es: elasticsearch.Elasticsearch, event_bus: EventBus, index_prefix: str, + mapping_repo: Es6MappingRepository, + ) -> None: super().__init__(event_bus=event_bus) - self._index = Es6Index(es=es, index_prefix=index_prefix) + self._index = Es6Index( + es=es, + mapping_repo=mapping_repo, + index_prefix=index_prefix, + ) # @classmethod # def from_dict(cls, **kwargs): From 49724c53b78765b3360fcb3b8142ebc82dab4651 Mon Sep 17 00:00:00 2001 From: Kristoffer Andersson Date: Thu, 22 Dec 2022 10:53:10 +0100 Subject: [PATCH 3/5] refactor: factor out ES6mappingRepo --- karp/cliapp/subapps/entries_subapp.py | 8 +- .../elasticsearch6/__init__.py | 6 +- .../elasticsearch6/es_mapping_repo.py | 654 ++---------------- .../queries/es6_search_service.py | 225 +----- .../repositories/es6_indicies.py | 270 +------- pyproject.toml | 3 +- 6 files changed, 92 insertions(+), 1074 deletions(-) diff --git a/karp/cliapp/subapps/entries_subapp.py b/karp/cliapp/subapps/entries_subapp.py index 637ec751..a1498017 100644 --- a/karp/cliapp/subapps/entries_subapp.py +++ b/karp/cliapp/subapps/entries_subapp.py @@ -162,13 +162,11 @@ def validate_entries( ), ): typer.echo(f"reading from {path or 'stdin'} ...", err=True) - err_output = None if not output and path: output = Path(f"{path}.v6.jsonl") - if not err_output and output: - err_output = Path(f"{output}.errors.jsonl") + err_output = Path(f"{output}.errors.jsonl") if config_path and resource_id_raw: typer.echo("You can't provide both '--resource_id' and '--config/-c'", err=True) @@ -211,9 +209,9 @@ def validate_entries( ) if error_counter.counter > 0: error_code = 130 - print( + typer.echo( f'{error_counter.counter} entries failed validation, see "{err_output}"', - file=sys.stderr, + err=True, ) if error_code: diff --git a/karp/search_infrastructure/elasticsearch6/__init__.py b/karp/search_infrastructure/elasticsearch6/__init__.py index 69578ba9..0a787627 100644 --- a/karp/search_infrastructure/elasticsearch6/__init__.py +++ b/karp/search_infrastructure/elasticsearch6/__init__.py @@ -1,4 +1,6 @@ -from karp.search_infrastructure.elasticsearch6.es_mapping_repo import Es6MappingRepository +from karp.search_infrastructure.elasticsearch6.es_mapping_repo import ( + Es6MappingRepository, +) -__all__ = ["Es6MappingRepository"] \ No newline at end of file +__all__ = ["Es6MappingRepository"] diff --git a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py index 63932613..71405be2 100644 --- a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py +++ b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py @@ -15,6 +15,7 @@ IndexEntry, IndexUnitOfWork, ) +from karp.search.domain.errors import UnsupportedField logger = logging.getLogger("karp") @@ -24,7 +25,13 @@ class MappingRepository(abc.ABC): - pass + @abc.abstractmethod + def get_index_name(self, resource_id: str) -> str: + ... + + @abc.abstractmethod + def get_alias_name(self, resource_id: str) -> str: + ... class Es6MappingRepository(MappingRepository): @@ -56,113 +63,55 @@ def ensure_config_index_exist(self) -> None: "dynamic": False, "properties": { "index_name": {"type": "text"}, - "alias_name": {"type": "text"} + "alias_name": {"type": "text"}, }, } }, }, ) - def create_index(self, resource_id: str, config): - print("creating es mapping ...") - mapping = create_es6_mapping(config) - - properties = mapping["properties"] - properties["freetext"] = {"type": "text"} - disabled_property = {"enabled": False} - properties["_entry_version"] = disabled_property - properties["_last_modified"] = disabled_property - properties["_last_modified_by"] = disabled_property - - body = { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1, - "refresh_interval": -1, - }, - "mappings": {"entry": mapping}, - } - - logger.debug(f"creating index '{index_name}' ...") - result = self.es.indices.create(index=index_name, body=body) - if "error" in result: - print("failed to create index") - raise RuntimeError("failed to create index") - print("index created") - self._set_index_name_for_resource(resource_id, index_name) - return index_name - def create_index_name(self, resource_id: str) -> str: date = datetime.now().strftime("%Y-%m-%d-%H%M%S%f") return f"{self._prefix}{resource_id}_{date}" - + def create_alias_name(self, resource_id: str) -> str: return f"{self._prefix}{resource_id}" - - def set_index_name_for_resource(self, resource_id: str, index_name: str) -> str: + + def _update_config(self, resource_id: str) -> dict[str, str]: + index_name = self.create_index_name(resource_id) + alias_name = self.create_alias_name(resource_id) + names = {"index_name": index_name, "alias_name": alias_name} self.es.index( index=self._config_index, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE, - body={"index_name": index_name}, + body=names, ) - return index_name + return names - def get_index_name_for_resource(self, resource_id: str) -> str: + def get_index_name(self, resource_id: str) -> str: try: res = self.es.get( index=self._config_index, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE ) except es_exceptions.NotFoundError as err: - logger.error( + logger.info( "didn't find index_name for resource '%s' details: %s", resource_id, err ) - return self.set_index_name_for_resource(resource_id, self.create_index_name(resource_id)) + return self._update_config(resource_id)["index_name"] return res["_source"]["index_name"] - def publish_index(self, resource_id: str): - if self.es.indices.exists_alias(name=resource_id): - self.es.indices.delete_alias(name=resource_id, index="*") - - index_name = self._get_index_name_for_resource(resource_id) - self.on_publish_resource(resource_id, index_name) - print(f"publishing '{resource_id}' => '{index_name}'") - self.es.indices.put_alias(name=resource_id, index=index_name) - - def add_entries(self, resource_id: str, entries: List[IndexEntry]): - index_name = self._get_index_name_for_resource(resource_id) - index_to_es = [] - for entry in entries: - assert isinstance(entry, IndexEntry) - # entry.update(metadata.to_dict()) - index_to_es.append( - { - "_index": index_name, - "_id": entry.id, - "_type": "entry", - "_source": entry.entry, - } + def get_alias_name(self, resource_id: str) -> str: + try: + res = self.es.get( + index=self._config_index, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE ) - - elasticsearch.helpers.bulk(self.es, index_to_es, refresh=True) - - def delete_entry( - self, - resource_id: str, - *, - entry_id: Optional[str] = None, - entry: Optional[Entry] = None, - ): - if not entry and not entry_id: - raise ValueError("Must give either 'entry' or 'entry_id'.") - if entry: - entry_id = entry.entry_id - self.es.delete( - index=resource_id, - doc_type="entry", - id=entry_id, - refresh=True, - ) + except es_exceptions.NotFoundError as err: + logger.info( + "didn't find alias_name for resource '%s' details: %s", resource_id, err + ) + return self._update_config(resource_id)["alias_name"] + return res["_source"]["alias_name"] @staticmethod def get_analyzed_fields_from_mapping( @@ -172,13 +121,12 @@ def get_analyzed_fields_from_mapping( for prop_name, prop_values in properties.items(): if "properties" in prop_values: - res = Es6Index.get_analyzed_fields_from_mapping( + res = Es6MappingRepository.get_analyzed_fields_from_mapping( prop_values["properties"] ) - analyzed_fields.extend([prop_name + "." + prop for prop in res]) - else: - if prop_values["type"] == "text": - analyzed_fields.append(prop_name) + analyzed_fields.extend([f"{prop_name}.{prop}" for prop in res]) + elif prop_values["type"] == "text": + analyzed_fields.append(prop_name) return analyzed_fields def _init_field_mapping( @@ -209,10 +157,14 @@ def _init_field_mapping( and "entry" in mapping[index]["mappings"] and "properties" in mapping[index]["mappings"]["entry"] ): - field_mapping[alias] = Es6Index.get_analyzed_fields_from_mapping( + field_mapping[ + alias + ] = Es6MappingRepository.get_analyzed_fields_from_mapping( mapping[index]["mappings"]["entry"]["properties"] ) - sortable_fields[alias] = Es6Index.create_sortable_map_from_mapping( + sortable_fields[ + alias + ] = Es6MappingRepository.create_sortable_map_from_mapping( mapping[index]["mappings"]["entry"]["properties"] ) return field_mapping, sortable_fields @@ -228,15 +180,16 @@ def _get_all_aliases(self) -> List[Tuple[str, str]]: :return: a list of tuples (alias_name, index_name) """ result = self.es.cat.aliases(h="alias,index") - print(f"{result}") + logger.debug(f"{result}") index_names = [] for index_name in result.split("\n")[:-1]: - print(f"index_name = {index_name}") + logger.debug(f"index_name = {index_name}") if index_name[0] != ".": - groups = re.search(r"([^ ]*) +(.*)", index_name).groups() - alias = groups[0] - index = groups[1] - index_names.append((alias, index)) + if match := re.search(r"([^ ]*) +(.*)", index_name): + groups = match.groups() + alias = groups[0] + index = groups[1] + index_names.append((alias, index)) return index_names def translate_sort_fields( @@ -272,7 +225,7 @@ def translate_sort_fields( return translated_sort_fields def translate_sort_field(self, resource_id: str, sort_value: str) -> List[str]: - print( + logger.debug( f"es6_indextranslate_sort_field: sortable_fields[{resource_id}] = {self.sortable_fields[resource_id]}" ) if sort_value in self.sortable_fields[resource_id]: @@ -291,12 +244,12 @@ def on_publish_resource(self, alias_name: str, index_name: str): ): self.analyzed_fields[ alias_name - ] = Es6Index.get_analyzed_fields_from_mapping( + ] = Es6MappingRepository.get_analyzed_fields_from_mapping( mapping[index_name]["mappings"]["entry"]["properties"] ) self.sortable_fields[ alias_name - ] = Es6Index.create_sortable_map_from_mapping( + ] = Es6MappingRepository.create_sortable_map_from_mapping( mapping[index_name]["mappings"]["entry"]["properties"] ) @@ -355,10 +308,10 @@ def recursive_field(parent_schema, parent_field_name, parent_field_def): fun = parent_field_def["function"] if list(fun.keys())[0] == "multi_ref": res_object = fun["multi_ref"]["result"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) if "result" in fun: res_object = fun["result"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) return if parent_field_def.get("ref"): if "field" in parent_field_def["ref"]: @@ -367,7 +320,7 @@ def recursive_field(parent_schema, parent_field_name, parent_field_def): res_object = {} res_object.update(parent_field_def) del res_object["ref"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) if parent_field_def["type"] != "object": # TODO this will not work when we have user defined types, s.a. saldoid # TODO number can be float/non-float, strings can be keyword or text in need of analyzing etc. @@ -395,7 +348,7 @@ def recursive_field(parent_schema, parent_field_name, parent_field_def): parent_schema["properties"][parent_field_name] = result for field_name, field_def in fields.items(): - print(f"creating mapping for field '{field_name}'") + logger.debug(f"creating mapping for field '{field_name}'") recursive_field(es_mapping, field_name, field_def) return es_mapping @@ -451,519 +404,28 @@ def create_es6_mapping(config: Dict) -> Dict: return mapping -class Es6IndexUnitOfWork(IndexUnitOfWork): +class Es6MappingRepositoryUnitOfWork(IndexUnitOfWork): def __init__( self, es: elasticsearch.Elasticsearch, event_bus: EventBus, ) -> None: super().__init__(event_bus=event_bus) - self._index = Es6Index(es=es) + self._index = Es6MappingRepository(es=es) # @classmethod # def from_dict(cls, **kwargs): # return cls() def _commit(self): - logger.debug("Calling _commit in Es6IndexUnitOfWork") + logger.debug("Calling _commit in Es6MappingRepositoryUnitOfWork") def rollback(self): return super().rollback() @property - def repo(self) -> Es6Index: + def repo(self) -> Es6MappingRepository: return self._index def _close(self): pass - - -import json -import logging -import re -from datetime import datetime -from typing import Any, Dict, List, Optional, Set, Tuple, Union - -import elasticsearch -import elasticsearch.helpers # pyre-ignore -import elasticsearch_dsl as es_dsl # pyre-ignore - -# from karp import query_dsl -from karp.search.application.queries import ( - QueryRequest, - SearchService, -) -from karp.search.domain.errors import ( - UnsupportedField, -) # IncompleteQuery,; UnsupportedQuery, -from karp.lex.domain.entities.entry import Entry -from karp.lex.domain.entities.resource import Resource -from karp.search.domain import query_dsl -from karp.search_infrastructure.elasticsearch6 import es_config -from .es_query import EsQuery - -# from karp.query_dsl import basic_ast as ast, op, is_a - - -logger = logging.getLogger("karp") - -KARP_CONFIGINDEX = "karp_config" -KARP_CONFIGINDEX_TYPE = "configs" - - -class EsQueryBuilder(query_dsl.NodeWalker): - def walk_object(self, node): - return node - - def walk__and(self, node): - result = self.walk(node.exps[0]) - for n in node.exps[1:]: - result = result & self.walk(n) - - return result - - def walk__contains(self, node): - return es_dsl.Q( - "regexp", **{self.walk(node.field): f".*{self.walk(node.arg)}.*"} - ) - - def walk__endswith(self, node): - return es_dsl.Q("regexp", **{self.walk(node.field): f".*{self.walk(node.arg)}"}) - - # def walk__equals(self, node): - # return es_dsl.Q( - # "match", - # **{ - # self.walk(node.field): {"query": self.walk(node.arg), "operator": "and"} - # }, - # ) - - def walk__equals(self, node): - return es_dsl.Q( - "match", - **{ - self.walk(node.field): {"query": self.walk(node.arg), "operator": "and"} - }, - ) - - def walk__exists(self, node): - return es_dsl.Q("exists", field=self.walk(node.field)) - - def walk__freergxp(self, node): - return es_dsl.Q( - "query_string", query=f"/{self.walk(node.arg)}/", default_field="*" - ) - - def walk__freetext_string(self, node): - return es_dsl.Q("multi_match", query=self.walk(node.arg), fuzziness=1) - - def walk__freetext_any_but_string(self, node): - return es_dsl.Q("multi_match", query=self.walk(node.arg)) - - def walk_range(self, node): - return es_dsl.Q( - "range", - **{self.walk(node.field): {self.walk(node.op): self.walk(node.arg)}}, - ) - - walk__gt = walk_range - walk__gte = walk_range - walk__lt = walk_range - walk__lte = walk_range - - def walk__missing(self, node): - return es_dsl.Q( - "bool", must_not=es_dsl.Q("exists", field=self.walk(node.field)) - ) - - def walk__not(self, node): - return ~self.walk(node.expr) - - def walk__or(self, node): - result = self.walk(node.exps[0]) - for n in node.exps[1:]: - result = result | self.walk(n) - - return result - - def walk__regexp(self, node): - return es_dsl.Q("regexp", **{self.walk(node.field): self.walk(node.arg)}) - - def walk__startswith(self, node): - return es_dsl.Q("regexp", **{self.walk(node.field): f"{self.walk(node.arg)}.*"}) - - -class Es6SearchService(SearchService): - def __init__(self, es: elasticsearch.Elasticsearch): - self.es: elasticsearch.Elasticsearch = es - self.query_builder = EsQueryBuilder() - self.parser = query_dsl.KarpQueryV6Parser( - semantics=query_dsl.KarpQueryV6ModelBuilderSemantics() - ) - if not self.es.indices.exists(index=KARP_CONFIGINDEX): - self.es.indices.create( - index=KARP_CONFIGINDEX, - body={ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1, - "refresh_interval": -1, - }, - "mappings": { - KARP_CONFIGINDEX_TYPE: { - "dynamic": False, - "properties": {"index_name": {"type": "text"}}, - } - }, - }, - ) - analyzed_fields, sortable_fields = self._init_field_mapping() - self.analyzed_fields: Dict[str, List[str]] = analyzed_fields - self.sortable_fields: Dict[str, Dict[str, List[str]]] = sortable_fields - - def _get_index_name_for_resource(self, resource_id: str) -> str: - res = self.es.get( - index=KARP_CONFIGINDEX, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE - ) - return res["_source"]["index_name"] - - @staticmethod - def get_analyzed_fields_from_mapping( - properties: Dict[str, Dict[str, Dict[str, Any]]] - ) -> List[str]: - analyzed_fields = [] - - for prop_name, prop_values in properties.items(): - if "properties" in prop_values: - res = Es6SearchService.get_analyzed_fields_from_mapping( - prop_values["properties"] - ) - analyzed_fields.extend([prop_name + "." + prop for prop in res]) - else: - if prop_values["type"] == "text": - analyzed_fields.append(prop_name) - return analyzed_fields - - def _init_field_mapping( - self, - ) -> Tuple[Dict[str, List[str]], Dict[str, Dict[str, List[str]]]]: - """ - Create a field mapping based on the mappings of elasticsearch - currently the only information we need is if a field is analyzed (i.e. text) - or not. - """ - - field_mapping: Dict[str, List[str]] = {} - sortable_fields = {} - # Doesn't work for tests, can't find resource_definition - # for resource in resourcemgr.get_available_resources(): - # mapping = self.es.indices.get_mapping(index=resource.resource_id) - # field_mapping[resource.resource_id] = parse_mapping( - # next(iter(mapping.values()))['mappings']['entry']['properties'] - # ) - aliases = self._get_all_aliases() - mapping: Dict[ - str, Dict[str, Dict[str, Dict[str, Dict]]] - ] = self.es.indices.get_mapping() - # print(f"mapping = {mapping}") - for (alias, index) in aliases: - if ( - "mappings" in mapping[index] - and "entry" in mapping[index]["mappings"] - and "properties" in mapping[index]["mappings"]["entry"] - ): - field_mapping[ - alias - ] = Es6SearchService.get_analyzed_fields_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - sortable_fields[ - alias - ] = Es6SearchService.create_sortable_map_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - return field_mapping, sortable_fields - - def _get_index_mappings( - self, index: Optional[str] = None - ) -> Dict[str, Dict[str, Dict[str, Dict[str, Dict]]]]: - kwargs = {"index": index} if index is not None else {} - return self.es.indices.get_mapping(**kwargs) - - def _get_all_aliases(self) -> List[Tuple[str, str]]: - """ - :return: a list of tuples (alias_name, index_name) - """ - result = self.es.cat.aliases(h="alias,index") - print(f"{result}") - index_names = [] - for index_name in result.split("\n")[:-1]: - print(f"index_name = {index_name}") - if index_name[0] != ".": - groups = re.search(r"([^ ]*) +(.*)", index_name).groups() - alias = groups[0] - index = groups[1] - index_names.append((alias, index)) - return index_names - - def build_query(self, args, resource_str: str) -> EsQuery: - query = EsQuery() - query.parse_arguments(args, resource_str) - return query - - def _format_result(self, resource_ids, response): - logger.debug( - "es6_search_service_format_result called with resource_ids=%s", resource_ids - ) - - def format_entry(entry): - - dict_entry = entry.to_dict() - version = dict_entry.pop("_entry_version", None) - last_modified_by = dict_entry.pop("_last_modified_by", None) - last_modified = dict_entry.pop("_last_modified", None) - return { - "id": entry.meta.id, - "version": version, - "last_modified": last_modified, - "last_modified_by": last_modified_by, - "resource": next( - resource - for resource in resource_ids - if entry.meta.index.startswith(resource) - ), - "entry": dict_entry, - } - - result = { - "total": response.hits.total, - "hits": [format_entry(entry) for entry in response], - } - return result - - def query(self, request: QueryRequest): - print(f"query called with {request}") - query = EsQuery.from_query_request(request) - return self.search_with_query(query) - - def query_split(self, request: QueryRequest): - print(f"query called with {request}") - query = EsQuery.from_query_request(request) - query.split_results = True - return self.search_with_query(query) - - def search_with_query(self, query: EsQuery): - logger.info("search_with_query called with query=%s", query) - print("search_with_query called with query={}".format(query)) - es_query = None - if query.q: - model = self.parser(query.q) - es_query = self.query_builder.walk(model) - if query.split_results: - ms = es_dsl.MultiSearch(using=self.es) - - for resource in query.resources: - s = es_dsl.Search(index=resource) - - if es_query is not None: - s = s.query(es_query) - s = s[query.from_ : query.from_ + query.size] - if query.sort: - s = s.sort(*self.translate_sort_fields([resource], query.sort)) - elif resource in query.sort_dict: - s = s.sort( - *self.translate_sort_fields( - [resource], query.sort_dict[resource] - ) - ) - ms = ms.add(s) - - responses = ms.execute() - result = {"total": 0, "hits": {}} - for i, response in enumerate(responses): - result["hits"][query.resources[i]] = self._format_result( - query.resources, response - ).get("hits", []) - result["total"] += response.hits.total - if query.lexicon_stats: - if "distribution" not in result: - result["distribution"] = {} - result["distribution"][query.resources[i]] = response.hits.total - return result - else: - s = es_dsl.Search(using=self.es, index=query.resources, doc_type="entry") - if es_query is not None: - s = s.query(es_query) - - s = s[query.from_ : query.from_ + query.size] - - if query.lexicon_stats: - s.aggs.bucket( - "distribution", "terms", field="_index", size=len(query.resources) - ) - if query.sort: - s = s.sort(*self.translate_sort_fields(query.resources, query.sort)) - elif query.sort_dict: - sort_fields = [] - for resource, sort in query.sort_dict.items(): - sort_fields.extend(self.translate_sort_fields([resource], sort)) - s = s.sort(*sort_fields) - logger.debug("s = %s", s.to_dict()) - response = s.execute() - - # TODO format response in a better way, because the whole response takes up too much space in the logs - # logger.debug('response = {}'.format(response.to_dict())) - - logger.debug("calling _format_result") - result = self._format_result(query.resources, response) - if query.lexicon_stats: - result["distribution"] = {} - for bucket in response.aggregations.distribution.buckets: - key = bucket["key"] - value = bucket["doc_count"] - result["distribution"][key.rsplit("_", 1)[0]] = value - - # logger.debug("return result = %s", result) - return result - - def translate_sort_fields( - self, resources: List[str], sort_values: List[str] - ) -> List[Union[str, Dict[str, Dict[str, str]]]]: - """Translate sort field to ES sort fields. - - Arguments: - sort_values {List[str]} -- values to sort by - - Returns: - List[str] -- values that ES can sort by. - """ - translated_sort_fields: List[Union[str, Dict[str, Dict[str, str]]]] = [] - for sort_value in sort_values: - sort_order = None - if "|" in sort_value: - sort_value, sort_order = sort_value.split("|", 1) - for resource_id in resources: - if sort_order: - translated_sort_fields.extend( - ( - {field: {"order": sort_order}} - for field in self.translate_sort_field( - resource_id, sort_value - ) - ) - ) - translated_sort_fields.extend( - self.translate_sort_field(resource_id, sort_value) - ) - - return translated_sort_fields - - def translate_sort_field(self, resource_id: str, sort_value: str) -> List[str]: - print( - f"es6_search_servicetranslate_sort_field: sortable_fields[{resource_id}] = {self.sortable_fields[resource_id]}" - ) - if sort_value in self.sortable_fields[resource_id]: - return self.sortable_fields[resource_id][sort_value] - else: - raise UnsupportedField( - f"You can't sort by field '{sort_value}' for resource '{resource_id}'" - ) - - def search_ids(self, resource_id: str, entry_ids: str): - logger.info( - "Called EsSearch.search_ids(self, args, resource_id, entry_ids) with:" - ) - logger.info(" resource_id = {}".format(resource_id)) - logger.info(" entry_ids = {}".format(entry_ids)) - entries = entry_ids.split(",") - query = es_dsl.Q("terms", _id=entries) - logger.debug("query = {}".format(query)) - s = es_dsl.Search(using=self.es, index=resource_id).query(query) - logger.debug("s = {}".format(s.to_dict())) - response = s.execute() - - return self._format_result([resource_id], response) - - def statistics(self, resource_id: str, field: str): - s = es_dsl.Search(using=self.es, index=resource_id) - s = s[0:0] - - if field in self.analyzed_fields[resource_id]: - field += ".raw" - - logger.debug("Statistics: analyzed fields are:") - logger.debug(json.dumps(self.analyzed_fields, indent=4)) - logger.debug( - "Doing aggregations on resource_id: {resource_id}, on field {field}".format( - resource_id=resource_id, field=field - ) - ) - s.aggs.bucket("field_values", "terms", field=field, size=2147483647) - response = s.execute() - print(f"{response=}") - return [ - {"value": bucket["key"], "count": bucket["doc_count"]} - for bucket in response.aggregations.field_values.buckets - ] - - def on_publish_resource(self, alias_name: str, index_name: str): - mapping = self._get_index_mappings(index=index_name) - if ( - "mappings" in mapping[index_name] - and "entry" in mapping[index_name]["mappings"] - and "properties" in mapping[index_name]["mappings"]["entry"] - ): - self.analyzed_fields[ - alias_name - ] = Es6SearchService.get_analyzed_fields_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - self.sortable_fields[ - alias_name - ] = Es6SearchService.create_sortable_map_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - - @staticmethod - def create_sortable_map_from_mapping(properties: Dict) -> Dict[str, List[str]]: - sortable_map = {} - - def parse_prop_value(sort_map, base_name, prop_name, prop_value: Dict): - if "properties" in prop_value: - for ext_name, ext_value in prop_value["properties"].items(): - ext_base_name = f"{base_name}.{ext_name}" - parse_prop_value(sort_map, ext_base_name, ext_base_name, ext_value) - return - if prop_value["type"] in [ - "boolean", - "date", - "double", - "keyword", - "long", - "ip", - ]: - sort_map[base_name] = [prop_name] - sort_map[prop_name] = [prop_name] - return - if prop_value["type"] == "text": - if "fields" in prop_value: - for ext_name, ext_value in prop_value["fields"].items(): - parse_prop_value( - sort_map, base_name, f"{base_name}.{ext_name}", ext_value - ) - return - - for prop_name, prop_value in properties.items(): - parse_prop_value(sortable_map, prop_name, prop_name, prop_value) - # if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: - # sortable_map[prop_name] = prop_name - # if prop_value["type"] == "text": - # if "fields" in prop_value: - - return sortable_map - - -# def parse_sortable_fields(properties: Dict[str, Any]) -> Dict[str, List[str]]: -# for prop_name, prop_value in properties.items(): -# if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: -# return [prop_name] diff --git a/karp/search_infrastructure/queries/es6_search_service.py b/karp/search_infrastructure/queries/es6_search_service.py index dab3f67f..f929794d 100644 --- a/karp/search_infrastructure/queries/es6_search_service.py +++ b/karp/search_infrastructure/queries/es6_search_service.py @@ -31,9 +31,6 @@ logger = logging.getLogger(__name__) -KARP_CONFIGINDEX = "karp_config" -KARP_CONFIGINDEX_TYPE = "configs" - class EsQueryBuilder(query_dsl.NodeWalker): def walk_object(self, node): @@ -130,113 +127,6 @@ def __init__( self.parser = query_dsl.KarpQueryV6Parser( semantics=query_dsl.KarpQueryV6ModelBuilderSemantics() ) - if not self.es.indices.exists(index=KARP_CONFIGINDEX): - self.es.indices.create( - index=KARP_CONFIGINDEX, - body={ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1, - "refresh_interval": -1, - }, - "mappings": { - KARP_CONFIGINDEX_TYPE: { - "dynamic": False, - "properties": {"index_name": {"type": "text"}}, - } - }, - }, - ) - analyzed_fields, sortable_fields = self._init_field_mapping() - self.analyzed_fields: Dict[str, List[str]] = analyzed_fields - self.sortable_fields: Dict[str, Dict[str, List[str]]] = sortable_fields - - def _get_index_name_for_resource(self, resource_id: str) -> str: - res = self.es.get( - index=KARP_CONFIGINDEX, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE - ) - return res["_source"]["index_name"] - - @staticmethod - def get_analyzed_fields_from_mapping( - properties: Dict[str, Dict[str, Dict[str, Any]]] - ) -> List[str]: - analyzed_fields = [] - - for prop_name, prop_values in properties.items(): - if "properties" in prop_values: - res = Es6SearchService.get_analyzed_fields_from_mapping( - prop_values["properties"] - ) - analyzed_fields.extend([prop_name + "." + prop for prop in res]) - else: - if prop_values["type"] == "text": - analyzed_fields.append(prop_name) - return analyzed_fields - - def _init_field_mapping( - self, - ) -> Tuple[Dict[str, List[str]], Dict[str, Dict[str, List[str]]]]: - """ - Create a field mapping based on the mappings of elasticsearch - currently the only information we need is if a field is analyzed (i.e. text) - or not. - """ - - field_mapping: Dict[str, List[str]] = {} - sortable_fields = {} - # Doesn't work for tests, can't find resource_definition - # for resource in resourcemgr.get_available_resources(): - # mapping = self.es.indices.get_mapping(index=resource.resource_id) - # field_mapping[resource.resource_id] = parse_mapping( - # next(iter(mapping.values()))['mappings']['entry']['properties'] - # ) - aliases = self._get_all_aliases() - mapping: Dict[ - str, Dict[str, Dict[str, Dict[str, Dict]]] - ] = self.es.indices.get_mapping() - # print(f"mapping = {mapping}") - for (alias, index) in aliases: - if ( - "mappings" in mapping[index] - and "entry" in mapping[index]["mappings"] - and "properties" in mapping[index]["mappings"]["entry"] - ): - field_mapping[ - alias - ] = Es6SearchService.get_analyzed_fields_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - sortable_fields[ - alias - ] = Es6SearchService.create_sortable_map_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - return field_mapping, sortable_fields - - def _get_index_mappings( - self, index: Optional[str] = None - ) -> Dict[str, Dict[str, Dict[str, Dict[str, Dict]]]]: - kwargs = {"index": index} if index is not None else {} - return self.es.indices.get_mapping(**kwargs) - - def _get_all_aliases(self) -> List[Tuple[str, str]]: - """ - :return: a list of tuples (alias_name, index_name) - """ - result = self.es.cat.aliases(h="alias,index") - logger.debug("ES aliases and indicies", extra={"result": result}) - index_names = [] - for index_name in result.split("\n")[:-1]: - logger.debug("existing index", extra={"index_name": index_name}) - if index_name[0] != ".": - opt_match = re.search(r"([^ ]*) +(.*)", index_name) - if opt_match: - groups = opt_match.groups() - alias = groups[0] - index = groups[1] - index_names.append((alias, index)) - return index_names def build_query(self, args, resource_str: str) -> EsQuery: query = EsQuery() @@ -304,7 +194,9 @@ def search_with_query(self, query: EsQuery): s = s.query(es_query) s = s[query.from_ : query.from_ + query.size] if query.sort: - s = s.sort(*self.translate_sort_fields([resource], query.sort)) + s = s.sort( + *self.mapping_repo.translate_sort_fields([resource], query.sort) + ) elif query.sort_dict and resource in query.sort_dict: s = s.sort( *self.translate_sort_fields( @@ -361,53 +253,6 @@ def search_with_query(self, query: EsQuery): # logger.debug("return result = %s", result) return result - def translate_sort_fields( - self, resources: List[str], sort_values: List[str] - ) -> List[Union[str, Dict[str, Dict[str, str]]]]: - """Translate sort field to ES sort fields. - - Arguments: - sort_values {List[str]} -- values to sort by - - Returns: - List[str] -- values that ES can sort by. - """ - translated_sort_fields: List[Union[str, Dict[str, Dict[str, str]]]] = [] - for sort_value in sort_values: - sort_order = None - if "|" in sort_value: - sort_value, sort_order = sort_value.split("|", 1) - for resource_id in resources: - if sort_order: - translated_sort_fields.extend( - ( - {field: {"order": sort_order}} - for field in self.translate_sort_field( - resource_id, sort_value - ) - ) - ) - translated_sort_fields.extend( - self.translate_sort_field(resource_id, sort_value) - ) - - return translated_sort_fields - - def translate_sort_field(self, resource_id: str, sort_value: str) -> List[str]: - logger.debug( - "sortable fields for resource", - extra={ - "resource_id": resource_id, - "sortable_fields": self.sortable_fields[resource_id], - }, - ) - if sort_value in self.sortable_fields[resource_id]: - return self.sortable_fields[resource_id][sort_value] - else: - raise UnsupportedField( - f"You can't sort by field '{sort_value}' for resource '{resource_id}'" - ) - def search_ids(self, resource_id: str, entry_ids: str): logger.info( "Called EsSearch.search_ids with:", @@ -424,7 +269,7 @@ def search_ids(self, resource_id: str, entry_ids: str): def statistics(self, resource_id: str, field: str) -> Iterable: s = es_dsl.Search(using=self.es, index=resource_id) - s = s[0:0] + s = s[:0] if field in self.analyzed_fields[resource_id]: field += ".raw" @@ -443,65 +288,3 @@ def statistics(self, resource_id: str, field: str) -> Iterable: {"value": bucket["key"], "count": bucket["doc_count"]} for bucket in response.aggregations.field_values.buckets ] - - def on_publish_resource(self, alias_name: str, index_name: str): - mapping = self._get_index_mappings(index=index_name) - if ( - "mappings" in mapping[index_name] - and "entry" in mapping[index_name]["mappings"] - and "properties" in mapping[index_name]["mappings"]["entry"] - ): - self.analyzed_fields[ - alias_name - ] = Es6SearchService.get_analyzed_fields_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - self.sortable_fields[ - alias_name - ] = Es6SearchService.create_sortable_map_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - - @staticmethod - def create_sortable_map_from_mapping(properties: dict) -> dict[str, list[str]]: - sortable_map: dict[str, list[str]] = {} - - def parse_prop_value(sort_map, base_name, prop_name, prop_value: Dict): - if "properties" in prop_value: - for ext_name, ext_value in prop_value["properties"].items(): - ext_base_name = f"{base_name}.{ext_name}" - parse_prop_value(sort_map, ext_base_name, ext_base_name, ext_value) - return - if prop_value["type"] in [ - "boolean", - "date", - "double", - "keyword", - "long", - "ip", - ]: - sort_map[base_name] = [prop_name] - sort_map[prop_name] = [prop_name] - return - if prop_value["type"] == "text": - if "fields" in prop_value: - for ext_name, ext_value in prop_value["fields"].items(): - parse_prop_value( - sort_map, base_name, f"{base_name}.{ext_name}", ext_value - ) - return - - for prop_name, prop_value in properties.items(): - parse_prop_value(sortable_map, prop_name, prop_name, prop_value) - # if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: - # sortable_map[prop_name] = prop_name - # if prop_value["type"] == "text": - # if "fields" in prop_value: - - return sortable_map - - -# def parse_sortable_fields(properties: Dict[str, Any]) -> Dict[str, List[str]]: -# for prop_name, prop_value in properties.items(): -# if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: -# return [prop_name] diff --git a/karp/search_infrastructure/repositories/es6_indicies.py b/karp/search_infrastructure/repositories/es6_indicies.py index f8889187..042726b5 100644 --- a/karp/search_infrastructure/repositories/es6_indicies.py +++ b/karp/search_infrastructure/repositories/es6_indicies.py @@ -20,9 +20,6 @@ logger = logging.getLogger(__name__) -KARP_CONFIGINDEX = "karp_config" -KARP_CONFIGINDEX_TYPE = "configs" - class Es6Index(Index): def __init__( @@ -30,36 +27,17 @@ def __init__( es: elasticsearch.Elasticsearch, mapping_repo: Es6MappingRepository, *, - index_prefix: str = "" + index_prefix: str = "", ): - self.es: elasticsearch.Elasticsearch = es + self.es = es + self.mapping_repo = mapping_repo self.index_prefix = index_prefix - if not self.es.indices.exists(index=KARP_CONFIGINDEX): - self.es.indices.create( - index=KARP_CONFIGINDEX, - body={ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1, - "refresh_interval": -1, - }, - "mappings": { - KARP_CONFIGINDEX_TYPE: { - "dynamic": False, - "properties": {"index_name": {"type": "text"}}, - } - }, - }, - ) - analyzed_fields, sortable_fields = self._init_field_mapping() - self.analyzed_fields: Dict[str, List[str]] = analyzed_fields - self.sortable_fields: Dict[str, Dict[str, List[str]]] = sortable_fields @property def seen(self): return [] - def create_index(self, resource_id, config): + def create_index(self, resource_id: str, config): logger.info("creating es mapping") mapping = create_es6_mapping(config) @@ -83,7 +61,7 @@ def create_index(self, resource_id, config): "mappings": {"entry": mapping}, } - index_name = self._create_index_name(resource_id) + index_name = self.mapping_repo.get_index_name(resource_id) logger.info("creating index", extra={"index_name": index_name, "body": body}) result = self.es.indices.create(index=index_name, body=body) if "error" in result: @@ -92,50 +70,27 @@ def create_index(self, resource_id, config): ) raise RuntimeError("failed to create index") logger.info("index created") - self._set_index_name_for_resource(resource_id, index_name) - return index_name - - def _create_index_name(self, resource_id: str) -> str: - date = datetime.now().strftime("%Y-%m-%d-%H%M%S%f") - return f"{self.index_prefix}{resource_id}_{date}" - - def _set_index_name_for_resource(self, resource_id: str, index_name: str) -> str: - self.es.index( - index=KARP_CONFIGINDEX, - id=resource_id, - doc_type=KARP_CONFIGINDEX_TYPE, - body={"index_name": index_name}, - ) return index_name - def _get_index_name_for_resource(self, resource_id: str) -> str: - try: - res = self.es.get( - index=KARP_CONFIGINDEX, id=resource_id, doc_type=KARP_CONFIGINDEX_TYPE - ) - except es_exceptions.NotFoundError as err: - logger.error( - "didn't find index_name for resource '%s' details: %s", resource_id, err - ) - return self._set_index_name_for_resource( - resource_id, self._create_index_name(resource_id) - ) - return res["_source"]["index_name"] - def publish_index(self, resource_id: str): - if self.es.indices.exists_alias(name=resource_id): - self.es.indices.delete_alias(name=resource_id, index="*") + alias_name = self.mapping_repo.get_alias_name(resource_id) + if self.es.indices.exists_alias(name=alias_name): + self.es.indices.delete_alias(name=alias_name, index="*") - index_name = self._get_index_name_for_resource(resource_id) - self.on_publish_resource(resource_id, index_name) + index_name = self.mapping_repo.get_index_name(resource_id) + self.mapping_repo.on_publish_resource(alias_name, index_name) logger.info( "publishing resource", - extra={"resource_id": resource_id, "index_name": index_name}, + extra={ + "resource_id": resource_id, + "index_name": index_name, + "alias_name": alias_name, + }, ) - self.es.indices.put_alias(name=resource_id, index=index_name) + self.es.indices.put_alias(name=alias_name, index=index_name) def add_entries(self, resource_id: str, entries: List[IndexEntry]): - index_name = self._get_index_name_for_resource(resource_id) + index_name = self.mapping_repo.get_index_name(resource_id) index_to_es = [] for entry in entries: assert isinstance(entry, IndexEntry) @@ -165,7 +120,7 @@ def delete_entry( logger.info( "deleting entry", extra={"entry_id": entry_id, "resource_id": resource_id} ) - index_name = self._get_index_name_for_resource(resource_id) + index_name = self.mapping_repo.get_index_name(resource_id) try: self.es.delete( index=index_name, @@ -183,188 +138,6 @@ def delete_entry( }, ) - @staticmethod - def get_analyzed_fields_from_mapping( - properties: Dict[str, Dict[str, Dict[str, Any]]] - ) -> List[str]: - analyzed_fields = [] - - for prop_name, prop_values in properties.items(): - if "properties" in prop_values: - res = Es6Index.get_analyzed_fields_from_mapping( - prop_values["properties"] - ) - analyzed_fields.extend([prop_name + "." + prop for prop in res]) - else: - if prop_values["type"] == "text": - analyzed_fields.append(prop_name) - return analyzed_fields - - def _init_field_mapping( - self, - ) -> Tuple[Dict[str, List[str]], Dict[str, Dict[str, List[str]]]]: - """ - Create a field mapping based on the mappings of elasticsearch - currently the only information we need is if a field is analyzed (i.e. text) - or not. - """ - - field_mapping: Dict[str, List[str]] = {} - sortable_fields = {} - # Doesn't work for tests, can't find resource_definition - # for resource in resourcemgr.get_available_resources(): - # mapping = self.es.indices.get_mapping(index=resource.resource_id) - # field_mapping[resource.resource_id] = parse_mapping( - # next(iter(mapping.values()))['mappings']['entry']['properties'] - # ) - aliases = self._get_all_aliases() - mapping: Dict[ - str, Dict[str, Dict[str, Dict[str, Dict]]] - ] = self.es.indices.get_mapping() - # print(f"mapping = {mapping}") - for (alias, index) in aliases: - if ( - "mappings" in mapping[index] - and "entry" in mapping[index]["mappings"] - and "properties" in mapping[index]["mappings"]["entry"] - ): - field_mapping[alias] = Es6Index.get_analyzed_fields_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - sortable_fields[alias] = Es6Index.create_sortable_map_from_mapping( - mapping[index]["mappings"]["entry"]["properties"] - ) - return field_mapping, sortable_fields - - def _get_index_mappings( - self, index: Optional[str] = None - ) -> Dict[str, Dict[str, Dict[str, Dict[str, Dict]]]]: - kwargs = {"index": index} if index is not None else {} - return self.es.indices.get_mapping(**kwargs) - - def _get_all_aliases(self) -> List[Tuple[str, str]]: - """ - :return: a list of tuples (alias_name, index_name) - """ - result = self.es.cat.aliases(h="alias,index") - print(f"{result}") - index_names = [] - for index_name in result.split("\n")[:-1]: - print(f"index_name = {index_name}") - if index_name[0] != ".": - match = re.search(r"([^ ]*) +(.*)", index_name) - if match: - groups = match.groups() - alias = groups[0] - index = groups[1] - index_names.append((alias, index)) - return index_names - - def translate_sort_fields( - self, resources: List[str], sort_values: List[str] - ) -> List[Union[str, Dict[str, Dict[str, str]]]]: - """Translate sort field to ES sort fields. - - Arguments: - sort_values {List[str]} -- values to sort by - - Returns: - List[str] -- values that ES can sort by. - """ - translated_sort_fields: List[Union[str, Dict[str, Dict[str, str]]]] = [] - for sort_value in sort_values: - sort_order = None - if "|" in sort_value: - sort_value, sort_order = sort_value.split("|", 1) - for resource_id in resources: - if sort_order: - translated_sort_fields.extend( - ( - {field: {"order": sort_order}} - for field in self.translate_sort_field( - resource_id, sort_value - ) - ) - ) - translated_sort_fields.extend( - self.translate_sort_field(resource_id, sort_value) - ) - - return translated_sort_fields - - def translate_sort_field(self, resource_id: str, sort_value: str) -> List[str]: - print( - f"es6_indextranslate_sort_field: sortable_fields[{resource_id}] = {self.sortable_fields[resource_id]}" - ) - if sort_value in self.sortable_fields[resource_id]: - return self.sortable_fields[resource_id][sort_value] - else: - raise UnsupportedField( - f"You can't sort by field '{sort_value}' for resource '{resource_id}'" - ) - - def on_publish_resource(self, alias_name: str, index_name: str): - mapping = self._get_index_mappings(index=index_name) - if ( - "mappings" in mapping[index_name] - and "entry" in mapping[index_name]["mappings"] - and "properties" in mapping[index_name]["mappings"]["entry"] - ): - self.analyzed_fields[ - alias_name - ] = Es6Index.get_analyzed_fields_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - self.sortable_fields[ - alias_name - ] = Es6Index.create_sortable_map_from_mapping( - mapping[index_name]["mappings"]["entry"]["properties"] - ) - - @staticmethod - def create_sortable_map_from_mapping(properties: Dict) -> Dict[str, List[str]]: - sortable_map = {} - - def parse_prop_value(sort_map, base_name, prop_name, prop_value: Dict): - if "properties" in prop_value: - for ext_name, ext_value in prop_value["properties"].items(): - ext_base_name = f"{base_name}.{ext_name}" - parse_prop_value(sort_map, ext_base_name, ext_base_name, ext_value) - return - if prop_value["type"] in [ - "boolean", - "date", - "double", - "keyword", - "long", - "ip", - ]: - sort_map[base_name] = [prop_name] - sort_map[prop_name] = [prop_name] - return - if prop_value["type"] == "text": - if "fields" in prop_value: - for ext_name, ext_value in prop_value["fields"].items(): - parse_prop_value( - sort_map, base_name, f"{base_name}.{ext_name}", ext_value - ) - return - - for prop_name, prop_value in properties.items(): - parse_prop_value(sortable_map, prop_name, prop_name, prop_value) - # if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: - # sortable_map[prop_name] = prop_name - # if prop_value["type"] == "text": - # if "fields" in prop_value: - - return sortable_map - - -# def parse_sortable_fields(properties: Dict[str, Any]) -> Dict[str, List[str]]: -# for prop_name, prop_value in properties.items(): -# if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: -# return [prop_name] - def _create_es_mapping(config): es_mapping = {"dynamic": False, "properties": {}} @@ -376,10 +149,10 @@ def recursive_field(parent_schema, parent_field_name, parent_field_def): fun = parent_field_def["function"] if list(fun.keys())[0] == "multi_ref": res_object = fun["multi_ref"]["result"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) if "result" in fun: res_object = fun["result"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) return if parent_field_def.get("ref"): if "field" in parent_field_def["ref"]: @@ -388,7 +161,7 @@ def recursive_field(parent_schema, parent_field_name, parent_field_def): res_object = {} res_object.update(parent_field_def) del res_object["ref"] - recursive_field(parent_schema, "v_" + parent_field_name, res_object) + recursive_field(parent_schema, f"v_{parent_field_name}", res_object) if parent_field_def["type"] != "object": # TODO this will not work when we have user defined types, s.a. saldoid # TODO number can be float/non-float, strings can be keyword or text in need of analyzing etc. @@ -479,7 +252,6 @@ def __init__( event_bus: EventBus, index_prefix: str, mapping_repo: Es6MappingRepository, - ) -> None: super().__init__(event_bus=event_bus) self._index = Es6Index( diff --git a/pyproject.toml b/pyproject.toml index 4a0b1b17..d768dc3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -84,7 +84,7 @@ urllib3 = "^1.26.13" mysql = ["mysqlclient", "PyMySQL", "aiomysql"] sqlite = ["aiosqlite"] -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] uvicorn = "^0.18.3" pytest = "^7.1.3" pytest-cov = "^3.0.0" @@ -107,6 +107,7 @@ mkdocstrings-python = "^0.7.1" types-factory-boy = "^0.3.1" types-deprecated = "^1.2.9" httpx = "^0.23.1" +pre-commit = "^2.20.0" [tool.vulture] exclude = ["karp/tests/"] From d74656f33ba7738a4a841203c6720a58cccd81b1 Mon Sep 17 00:00:00 2001 From: Kristoffer Andersson Date: Thu, 22 Dec 2022 11:02:13 +0100 Subject: [PATCH 4/5] fix: fix es6 prefix --- karp/search_infrastructure/__init__.py | 1 + .../elasticsearch6/es_mapping_repo.py | 136 ------------------ .../queries/es6_search_service.py | 77 +++++----- .../test_es6_insanity_checks.py | 4 +- 4 files changed, 43 insertions(+), 175 deletions(-) diff --git a/karp/search_infrastructure/__init__.py b/karp/search_infrastructure/__init__.py index 4401d2b8..d76b15be 100644 --- a/karp/search_infrastructure/__init__.py +++ b/karp/search_infrastructure/__init__.py @@ -118,6 +118,7 @@ def __init__(self, index_prefix: Optional[str] = None) -> None: self._index_prefix = index_prefix or "" @injector.provider + @injector.singleton def es6_mapping_repo( self, es: elasticsearch.Elasticsearch, diff --git a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py index 71405be2..19c8dfa3 100644 --- a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py +++ b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py @@ -292,140 +292,4 @@ def parse_prop_value(sort_map, base_name, prop_name, prop_value: Dict): return sortable_map -# def parse_sortable_fields(properties: Dict[str, Any]) -> Dict[str, List[str]]: -# for prop_name, prop_value in properties.items(): -# if prop_value["type"] in ["boolean", "date", "double", "keyword", "long", "ip"]: -# return [prop_name] - - -def _create_es_mapping(config): - es_mapping = {"dynamic": False, "properties": {}} - - fields = config["fields"] - - def recursive_field(parent_schema, parent_field_name, parent_field_def): - if parent_field_def.get("virtual", False): - fun = parent_field_def["function"] - if list(fun.keys())[0] == "multi_ref": - res_object = fun["multi_ref"]["result"] - recursive_field(parent_schema, f"v_{parent_field_name}", res_object) - if "result" in fun: - res_object = fun["result"] - recursive_field(parent_schema, f"v_{parent_field_name}", res_object) - return - if parent_field_def.get("ref"): - if "field" in parent_field_def["ref"]: - res_object = parent_field_def["ref"]["field"] - else: - res_object = {} - res_object.update(parent_field_def) - del res_object["ref"] - recursive_field(parent_schema, f"v_{parent_field_name}", res_object) - if parent_field_def["type"] != "object": - # TODO this will not work when we have user defined types, s.a. saldoid - # TODO number can be float/non-float, strings can be keyword or text in need of analyzing etc. - if parent_field_def["type"] == "integer": - mapped_type = "long" - elif parent_field_def["type"] == "number": - mapped_type = "double" - elif parent_field_def["type"] == "boolean": - mapped_type = "boolean" - elif parent_field_def["type"] == "string": - mapped_type = "text" - elif parent_field_def["type"] == "long_string": - mapped_type = "text" - else: - mapped_type = "keyword" - result = {"type": mapped_type} - if parent_field_def["type"] == "string": - result["fields"] = {"raw": {"type": "keyword"}} - else: - result = {"properties": {}} - - for child_field_name, child_field_def in parent_field_def["fields"].items(): - recursive_field(result, child_field_name, child_field_def) - - parent_schema["properties"][parent_field_name] = result - - for field_name, field_def in fields.items(): - logger.debug(f"creating mapping for field '{field_name}'") - recursive_field(es_mapping, field_name, field_def) - - return es_mapping - - -def create_es6_mapping(config: Dict) -> Dict: - mapping = _create_es_mapping(config) - mapping["settings"] = { - "analysis": { - "analyzer": { - "default": { - "char_filter": [ - "compound", - "swedish_aa", - "swedish_ae", - "swedish_oe", - ], - "filter": ["swedish_folding", "lowercase"], - "tokenizer": "standard", - } - }, - "char_filter": { - "compound": { - "pattern": "-", - "replacement": "", - "type": "pattern_replace", - }, - "swedish_aa": { - "pattern": "[Ǻǻ]", - "replacement": "å", - "type": "pattern_replace", - }, - "swedish_ae": { - "pattern": "[æÆǞǟ]", - "replacement": "ä", - "type": "pattern_replace", - }, - "swedish_oe": { - "pattern": "[ØøŒœØ̈ø̈ȪȫŐőÕõṌṍṎṏȬȭǾǿǬǭŌōṒṓṐṑ]", - "replacement": "ö", - "type": "pattern_replace", - }, - }, - "filter": { - "swedish_folding": { - "type": "icu_folding", - "unicodeSetFilter": "[^åäöÅÄÖ]", - }, - "swedish_sort": {"language": "sv", "type": "icu_collation"}, - }, - } - } - return mapping - - -class Es6MappingRepositoryUnitOfWork(IndexUnitOfWork): - def __init__( - self, - es: elasticsearch.Elasticsearch, - event_bus: EventBus, - ) -> None: - super().__init__(event_bus=event_bus) - self._index = Es6MappingRepository(es=es) - - # @classmethod - # def from_dict(cls, **kwargs): - # return cls() - - def _commit(self): - logger.debug("Calling _commit in Es6MappingRepositoryUnitOfWork") - - def rollback(self): - return super().rollback() - - @property - def repo(self) -> Es6MappingRepository: - return self._index - def _close(self): - pass diff --git a/karp/search_infrastructure/queries/es6_search_service.py b/karp/search_infrastructure/queries/es6_search_service.py index f929794d..c032905a 100644 --- a/karp/search_infrastructure/queries/es6_search_service.py +++ b/karp/search_infrastructure/queries/es6_search_service.py @@ -216,42 +216,45 @@ def search_with_query(self, query: EsQuery): if "distribution" not in result: result["distribution"] = {} result["distribution"][query.resources[i]] = response.hits.total - return result else: - s = es_dsl.Search(using=self.es, index=query.resources, doc_type="entry") - if es_query is not None: - s = s.query(es_query) - - s = s[query.from_ : query.from_ + query.size] - - if query.lexicon_stats: - s.aggs.bucket( - "distribution", "terms", field="_index", size=len(query.resources) - ) - if query.sort: - s = s.sort(*self.translate_sort_fields(query.resources, query.sort)) - elif query.sort_dict: - sort_fields = [] - for resource, sort in query.sort_dict.items(): - sort_fields.extend(self.translate_sort_fields([resource], sort)) - s = s.sort(*sort_fields) - logger.debug("s = %s", extra={"es_query s": s.to_dict()}) - response = s.execute() - - # TODO format response in a better way, because the whole response takes up too much space in the logs - # logger.debug('response = {}'.format(response.to_dict())) - - logger.debug("calling _format_result") - result = self._format_result(query.resources, response) - if query.lexicon_stats: - result["distribution"] = {} - for bucket in response.aggregations.distribution.buckets: - key = bucket["key"] - value = bucket["doc_count"] - result["distribution"][key.rsplit("_", 1)[0]] = value - - # logger.debug("return result = %s", result) - return result + result = self._extracted_from_search_with_query_47(query, es_query) + + return result + + # TODO Rename this here and in `search_with_query` + def _extracted_from_search_with_query_47(self, query, es_query): + s = es_dsl.Search(using=self.es, index=query.resources, doc_type="entry") + if es_query is not None: + s = s.query(es_query) + + s = s[query.from_ : query.from_ + query.size] + + if query.lexicon_stats: + s.aggs.bucket( + "distribution", "terms", field="_index", size=len(query.resources) + ) + if query.sort: + s = s.sort(*self.mapping_repo.translate_sort_fields(query.resources, query.sort)) + elif query.sort_dict: + sort_fields = [] + for resource, sort in query.sort_dict.items(): + sort_fields.extend(self.mapping_repo.translate_sort_fields([resource], sort)) + s = s.sort(*sort_fields) + logger.debug("s = %s", extra={"es_query s": s.to_dict()}) + response = s.execute() + + # TODO format response in a better way, because the whole response takes up too much space in the logs + # logger.debug('response = {}'.format(response.to_dict())) + + logger.debug("calling _format_result") + result = self._format_result(query.resources, response) + if query.lexicon_stats: + result["distribution"] = {} + for bucket in response.aggregations.distribution.buckets: + key = bucket["key"] + result["distribution"][key.rsplit("_", 1)[0]] = bucket["doc_count"] + + return result def search_ids(self, resource_id: str, entry_ids: str): logger.info( @@ -271,11 +274,11 @@ def statistics(self, resource_id: str, field: str) -> Iterable: s = es_dsl.Search(using=self.es, index=resource_id) s = s[:0] - if field in self.analyzed_fields[resource_id]: + if field in self.mapping_repo.analyzed_fields[resource_id]: field += ".raw" logger.debug("Statistics: analyzed fields are:") - logger.debug(json.dumps(self.analyzed_fields, indent=4)) + logger.debug(json.dumps(self.mapping_repo.analyzed_fields, indent=4)) logger.debug( "Doing aggregations on resource_id: {resource_id}, on field {field}".format( resource_id=resource_id, field=field diff --git a/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py b/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py index bf16d547..ef2cfdfe 100644 --- a/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py +++ b/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py @@ -8,8 +8,8 @@ class TestEs6Index: def test_can_instantiate_es6_index(self): with contextlib.suppress(AttributeError): - Es6Index(None) + Es6Index(es=None,mapping_repo=None) def test_can_instantiate_es6_index_uow(self): with contextlib.suppress(AttributeError): - Es6IndexUnitOfWork(None, event_bus=None, index_prefix="") + Es6IndexUnitOfWork(es=None, event_bus=None, index_prefix="", mapping_repo=None) From d9049c2102461551232fd25e6609e9eddc574c8c Mon Sep 17 00:00:00 2001 From: Kristoffer Andersson Date: Thu, 22 Dec 2022 11:05:20 +0100 Subject: [PATCH 5/5] style: after black --- .../elasticsearch6/es_mapping_repo.py | 3 --- karp/search_infrastructure/queries/es6_search_service.py | 8 ++++++-- karp/search_infrastructure/repositories/es6_indicies.py | 4 ++-- .../search_infrastructure/test_es6_insanity_checks.py | 6 ++++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py index 19c8dfa3..9eb9d6b2 100644 --- a/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py +++ b/karp/search_infrastructure/elasticsearch6/es_mapping_repo.py @@ -290,6 +290,3 @@ def parse_prop_value(sort_map, base_name, prop_name, prop_value: Dict): # if "fields" in prop_value: return sortable_map - - - diff --git a/karp/search_infrastructure/queries/es6_search_service.py b/karp/search_infrastructure/queries/es6_search_service.py index c032905a..7b67d727 100644 --- a/karp/search_infrastructure/queries/es6_search_service.py +++ b/karp/search_infrastructure/queries/es6_search_service.py @@ -234,11 +234,15 @@ def _extracted_from_search_with_query_47(self, query, es_query): "distribution", "terms", field="_index", size=len(query.resources) ) if query.sort: - s = s.sort(*self.mapping_repo.translate_sort_fields(query.resources, query.sort)) + s = s.sort( + *self.mapping_repo.translate_sort_fields(query.resources, query.sort) + ) elif query.sort_dict: sort_fields = [] for resource, sort in query.sort_dict.items(): - sort_fields.extend(self.mapping_repo.translate_sort_fields([resource], sort)) + sort_fields.extend( + self.mapping_repo.translate_sort_fields([resource], sort) + ) s = s.sort(*sort_fields) logger.debug("s = %s", extra={"es_query s": s.to_dict()}) response = s.execute() diff --git a/karp/search_infrastructure/repositories/es6_indicies.py b/karp/search_infrastructure/repositories/es6_indicies.py index 042726b5..3b5f7ef9 100644 --- a/karp/search_infrastructure/repositories/es6_indicies.py +++ b/karp/search_infrastructure/repositories/es6_indicies.py @@ -1,7 +1,7 @@ from datetime import datetime import logging import re -from typing import Dict, List, Optional, Any, Tuple, Union +from typing import Dict, Iterable, List, Optional, Any, Tuple, Union import elasticsearch from elasticsearch import exceptions as es_exceptions @@ -89,7 +89,7 @@ def publish_index(self, resource_id: str): ) self.es.indices.put_alias(name=alias_name, index=index_name) - def add_entries(self, resource_id: str, entries: List[IndexEntry]): + def add_entries(self, resource_id: str, entries: Iterable[IndexEntry]): index_name = self.mapping_repo.get_index_name(resource_id) index_to_es = [] for entry in entries: diff --git a/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py b/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py index ef2cfdfe..75d47a34 100644 --- a/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py +++ b/karp/tests/unit/search_infrastructure/test_es6_insanity_checks.py @@ -8,8 +8,10 @@ class TestEs6Index: def test_can_instantiate_es6_index(self): with contextlib.suppress(AttributeError): - Es6Index(es=None,mapping_repo=None) + Es6Index(es=None, mapping_repo=None) def test_can_instantiate_es6_index_uow(self): with contextlib.suppress(AttributeError): - Es6IndexUnitOfWork(es=None, event_bus=None, index_prefix="", mapping_repo=None) + Es6IndexUnitOfWork( + es=None, event_bus=None, index_prefix="", mapping_repo=None + )