diff --git a/search_service/__init__.py b/search_service/__init__.py index f7104592..c6ce69a3 100644 --- a/search_service/__init__.py +++ b/search_service/__init__.py @@ -11,7 +11,7 @@ from typing import Dict, Any # noqa: F401 from flasgger import Swagger -from search_service.api.dashboard import SearchDashboardAPI +from search_service.api.dashboard import SearchDashboardAPI, SearchDashboardFilterAPI from search_service.api.table import SearchTableAPI, SearchTableFilterAPI from search_service.api.user import SearchUserAPI from search_service.api.document import DocumentUserAPI, DocumentTableAPI, DocumentTablesAPI, DocumentUsersAPI @@ -81,8 +81,9 @@ def create_app(*, config_module_class: str) -> Flask: api_bp.add_url_rule('/healthcheck', 'healthcheck', healthcheck) api = Api(api_bp) # Table Search API - # TODO: Rename endpoint to be more generic and accept a resource type so that logic can be re-used + api.add_resource(SearchTableFilterAPI, '/search_table') + # TODO: Rename endpoint to be more generic and accept a resource type so that logic can be re-used api.add_resource(SearchTableAPI, '/search') # User Search API @@ -90,8 +91,10 @@ def create_app(*, config_module_class: str) -> Flask: # Dashboard Search API api.add_resource(SearchDashboardAPI, '/search_dashboard') + api.add_resource(SearchDashboardFilterAPI, '/search_dashboard_filter') # DocumentAPI + # todo: needs to update to handle dashboard/user or other entities use cases. api.add_resource(DocumentTablesAPI, '/document_table') api.add_resource(DocumentTableAPI, '/document_table/') diff --git a/search_service/api/base.py b/search_service/api/base.py new file mode 100644 index 00000000..85fd8af7 --- /dev/null +++ b/search_service/api/base.py @@ -0,0 +1,59 @@ +from http import HTTPStatus +from typing import Any, Dict, Iterable # noqa: F401 + +from flask_restful import Resource, reqparse +from marshmallow_annotations.ext.attrs import AttrsSchema + +from search_service.proxy import get_proxy_client + + +class BaseFilterAPI(Resource): + """ + Base Filter API for search filtering + + This API should be generic enough to support every search filter use case. + """ + + def __init__(self, *, schema: AttrsSchema, index: str) -> None: + self.proxy = get_proxy_client() + self.schema = schema + self.index = index + self.parser = reqparse.RequestParser(bundle_errors=True) + + self.parser.add_argument('page_index', required=False, default=0, type=int) + self.parser.add_argument('query_term', required=False, type=str) + self.parser.add_argument('search_request', type=dict) + + super(BaseFilterAPI, self).__init__() + + def post(self) -> Iterable[Any]: + """ + Fetch search results based on the page_index, query_term, and + search_request dictionary posted in the request JSON. + :return: json payload of schema. + doesn't match any tables + """ + args = self.parser.parse_args(strict=True) + page_index = args.get('page_index') # type: int + + search_request = args.get('search_request') # type: Dict + if search_request is None: + msg = 'The search request payload is not available in the request' + return {'message': msg}, HTTPStatus.BAD_REQUEST + + query_term = args.get('query_term') # type: str + if ':' in query_term: + msg = 'The query term contains an invalid character' + return {'message': msg}, HTTPStatus.BAD_REQUEST + + try: + results = self.proxy.fetch_search_results_with_filter( + search_request=search_request, + query_term=query_term, + page_index=page_index, + index=self.index + ) + + return self.schema().dump(results).data, HTTPStatus.OK + except RuntimeError as e: + raise e diff --git a/search_service/api/dashboard.py b/search_service/api/dashboard.py index 45a453cd..3df65a11 100644 --- a/search_service/api/dashboard.py +++ b/search_service/api/dashboard.py @@ -5,6 +5,7 @@ from flasgger import swag_from from flask_restful import Resource, reqparse # noqa: I201 +from search_service.api.base import BaseFilterAPI from search_service.exception import NotFoundException from search_service.models.dashboard import SearchDashboardResultSchema from search_service.proxy import get_proxy_client @@ -57,3 +58,20 @@ def get(self) -> Iterable[Any]: err_msg = 'Exception encountered while processing search request' LOGGING.exception(err_msg) return {'message': err_msg}, HTTPStatus.INTERNAL_SERVER_ERROR + + +class SearchDashboardFilterAPI(BaseFilterAPI): + """ + Search Filter for Dashboard + """ + def __init__(self) -> None: + super().__init__(schema=SearchDashboardResultSchema, + index=DASHBOARD_INDEX) + + @swag_from('swagger_doc/dashboard/search_dashboard_filter.yml') + def post(self) -> Iterable[Any]: + try: + return super().post() + except RuntimeError: + err_msg = 'Exception encountered while processing search request' + return {'message': err_msg}, HTTPStatus.INTERNAL_SERVER_ERROR diff --git a/search_service/api/swagger_doc/dashboard/search_dashboard_filter.yml b/search_service/api/swagger_doc/dashboard/search_dashboard_filter.yml new file mode 100644 index 00000000..9f781727 --- /dev/null +++ b/search_service/api/swagger_doc/dashboard/search_dashboard_filter.yml @@ -0,0 +1,38 @@ +Dashboard search +This is used by the frontend API to search dashboard information. +--- +tags: + - 'search_dashboard_filter' +paths: + /search_dashboard: + post: + summary: This is used by the frontend API to search dashboard information. + requestBody: + description: The json data passed from the frontend API to execute a search. + required: true + content: + application/json: + schema: + type: object + properties: + index: + type: string + page_index: + type: integer + query_term: + type: string + search_request: + type: object + responses: + 200: + description: dashboard result information with query string + content: + application/json: + schema: + $ref: '#/components/schemas/SearchDashboardResults' + 500: + description: Exception encountered while searching + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' diff --git a/search_service/api/swagger_doc/table/search_table_filter.yml b/search_service/api/swagger_doc/table/search_table_filter.yml index 28537222..798300cc 100644 --- a/search_service/api/swagger_doc/table/search_table_filter.yml +++ b/search_service/api/swagger_doc/table/search_table_filter.yml @@ -2,7 +2,7 @@ Table search This is used by the frontend API to search table information. --- tags: - - 'search' + - 'search_table' paths: /search_table: post: @@ -15,11 +15,10 @@ paths: schema: type: object properties: + index: + type: string page_index: type: integer - schema: - type: integer - default: 0 query_term: type: string search_request: diff --git a/search_service/api/table.py b/search_service/api/table.py index 5d317992..8761a06c 100644 --- a/search_service/api/table.py +++ b/search_service/api/table.py @@ -1,9 +1,10 @@ from http import HTTPStatus -from typing import Any, Dict, Iterable # noqa: F401 +from typing import Any, Iterable # noqa: F401 from flask_restful import Resource, reqparse from flasgger import swag_from +from search_service.api.base import BaseFilterAPI from search_service.models.table import SearchTableResultSchema from search_service.proxy import get_proxy_client @@ -53,55 +54,18 @@ def get(self) -> Iterable[Any]: return {'message': err_msg}, HTTPStatus.INTERNAL_SERVER_ERROR -class SearchTableFilterAPI(Resource): +class SearchTableFilterAPI(BaseFilterAPI): """ - Search Table API using search filtering - - This API should be generic enough to support every search filter use case. - TODO: Deprecate the SearchTableFieldAPI for this more flexible API + Search Filter for table """ - def __init__(self) -> None: - self.proxy = get_proxy_client() - - self.parser = reqparse.RequestParser(bundle_errors=True) - - self.parser.add_argument('index', required=False, default=TABLE_INDEX, type=str) - self.parser.add_argument('page_index', required=False, default=0, type=int) - self.parser.add_argument('query_term', required=False, type=str) - self.parser.add_argument('search_request', type=dict) - - super(SearchTableFilterAPI, self).__init__() + super().__init__(schema=SearchTableResultSchema, + index=TABLE_INDEX) @swag_from('swagger_doc/table/search_table_filter.yml') def post(self) -> Iterable[Any]: - """ - Fetch search results based on the page_index, query_term, and - search_request dictionary posted in the request JSON. - :return: list of table results. List can be empty if query - doesn't match any tables - """ - args = self.parser.parse_args(strict=True) - page_index = args.get('page_index') # type: int - - search_request = args.get('search_request') # type: Dict - if search_request is None: - msg = 'The search request payload is not available in the request' - return {'message': msg}, HTTPStatus.BAD_REQUEST - - query_term = args.get('query_term') # type: str - if ':' in query_term: - msg = 'The query term contains an invalid character' - return {'message': msg}, HTTPStatus.BAD_REQUEST - try: - results = self.proxy.fetch_table_search_results_with_filter( - search_request=search_request, - query_term=query_term, - page_index=page_index, - index=args['index'] - ) - return SearchTableResultSchema().dump(results).data, HTTPStatus.OK + return super().post() except RuntimeError: err_msg = 'Exception encountered while processing search request' return {'message': err_msg}, HTTPStatus.INTERNAL_SERVER_ERROR diff --git a/search_service/proxy/atlas.py b/search_service/proxy/atlas.py index 9566b645..08d0501a 100644 --- a/search_service/proxy/atlas.py +++ b/search_service/proxy/atlas.py @@ -215,11 +215,11 @@ def fetch_table_search_results(self, *, return SearchTableResult(total_results=approx_count, results=tables) - def fetch_table_search_results_with_filter(self, *, - query_term: str, - search_request: dict, - page_index: int = 0, - index: str = '') -> SearchTableResult: + def fetch_search_results_with_filter(self, *, + query_term: str, + search_request: dict, + page_index: int = 0, + index: str = '') -> SearchTableResult: """ Conduct an 'Advanced Search' to narrow down search results with a use of filters. diff --git a/search_service/proxy/base.py b/search_service/proxy/base.py index 5bc32da0..4a48949f 100644 --- a/search_service/proxy/base.py +++ b/search_service/proxy/base.py @@ -1,5 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import Any, Dict, List +from typing import Any, Dict, List, Union from search_service.models.dashboard import SearchDashboardResult from search_service.models.table import SearchTableResult @@ -45,11 +45,12 @@ def delete_document(self, *, pass @abstractmethod - def fetch_table_search_results_with_filter(self, *, - query_term: str, - search_request: dict, - page_index: int = 0, - index: str = '') -> SearchTableResult: + def fetch_search_results_with_filter(self, *, + query_term: str, + search_request: dict, + page_index: int = 0, + index: str = '') -> Union[SearchTableResult, + SearchDashboardResult]: pass @abstractmethod diff --git a/search_service/proxy/elasticsearch.py b/search_service/proxy/elasticsearch.py index ad6d5af8..e2a098c5 100644 --- a/search_service/proxy/elasticsearch.py +++ b/search_service/proxy/elasticsearch.py @@ -1,16 +1,16 @@ import logging import uuid import itertools -from typing import Any, List, Dict +from typing import Any, List, Dict, Union from elasticsearch import Elasticsearch from elasticsearch_dsl import Search, query from elasticsearch.exceptions import NotFoundError from flask import current_app -from amundsen_common.models.index_map import USER_INDEX_MAP -from amundsen_common.models.index_map import TABLE_INDEX_MAP +from amundsen_common.models.index_map import USER_INDEX_MAP, TABLE_INDEX_MAP from search_service import config +from search_service.api.dashboard import DASHBOARD_INDEX from search_service.api.user import USER_INDEX from search_service.api.table import TABLE_INDEX from search_service.models.search_result import SearchResult @@ -43,6 +43,14 @@ 'tags': Tag } +# mapping to translate request for dashboard resources +DASHBOARD_MAPPING = { + 'group_name': 'group_name.raw', + 'name': 'name.raw', + 'product': 'product', + 'tag': 'tags', +} + class ElasticsearchProxy(BaseProxy): """ @@ -154,137 +162,6 @@ def _search_helper(self, page_index: int, model=model, search_result_model=search_result_model) - def _create_document_helper(self, data: List[Table], index: str) -> str: - # fetch indices that use our chosen alias (should only ever return one in a list) - indices = self._fetch_old_index(index) - - for i in indices: - # build a list of elasticsearch actions for bulk upload - actions = self._build_index_actions(data=data, index_key=i) - - # bulk create or update data - self._bulk_helper(actions) - - return index - - def _update_document_helper(self, data: List[Table], index: str) -> str: - # fetch indices that use our chosen alias (should only ever return one in a list) - indices = self._fetch_old_index(index) - - for i in indices: - # build a list of elasticsearch actions for bulk update - actions = self._build_update_actions(data=data, index_key=i) - - # bulk update existing documents in index - self._bulk_helper(actions) - - return index - - def _delete_document_helper(self, data: List[str], index: str) -> str: - # fetch indices that use our chosen alias - indices = self._fetch_old_index(index) - - # set the document type - type = User.get_type() if index is USER_INDEX else Table.get_type() - - for i in indices: - # build a list of elasticsearch actions for bulk deletion - actions = self._build_delete_actions(data=data, index_key=i, type=type) - - # bulk delete documents in index - self._bulk_helper(actions) - - return index - - def _build_index_actions(self, data: List[Table], index_key: str) -> List[Dict[str, Any]]: - actions = list() - for item in data: - index_action = {'index': {'_index': index_key, '_type': item.get_type(), '_id': item.get_id()}} - actions.append(index_action) - actions.append(item.__dict__) - return actions - - def _build_update_actions(self, data: List[Table], index_key: str) -> List[Dict[str, Any]]: - actions = list() - - for item in data: - actions.append({'update': {'_index': index_key, '_type': item.get_type(), '_id': item.get_id()}}) - actions.append({'doc': item.__dict__}) - return actions - - def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> List[Dict[str, Any]]: - return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data] - - def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None: - result = self.elasticsearch.bulk(actions) - - if result['errors']: - # ES's error messages are nested within elasticsearch objects and can - # fail silently if you aren't careful - LOGGING.error('Error during Elasticsearch bulk actions') - LOGGING.debug(result['items']) - return - - def _fetch_old_index(self, alias: str) -> List[str]: - """ - Retrieve all indices that are currently tied to alias - (Can most often expect only one index to be returned in this list) - :return: list of elasticsearch indices - """ - try: - indices = self.elasticsearch.indices.get_alias(alias).keys() - return indices - except NotFoundError: - LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True) - - # create a new index if there isn't already one that is usable - new_index = self._create_index_helper(alias=alias) - return [new_index] - - def _create_index_helper(self, alias: str) -> str: - index_key = str(uuid.uuid4()) - mapping: str = self._get_mapping(alias=alias) - self.elasticsearch.indices.create(index=index_key, body=mapping) - - # alias our new index - index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]} - self.elasticsearch.indices.update_aliases(index_actions) - return index_key - - def _get_mapping(self, alias: str) -> str: - if alias is USER_INDEX: - return USER_INDEX_MAP - elif alias is TABLE_INDEX: - return TABLE_INDEX_MAP - return '' - - def _search_wildcard_helper(self, field_value: str, - page_index: int, - client: Search, - field_name: str) -> SearchResult: - """ - Do a wildcard match search with the query term. - - :param field_value: - :param page_index: - :param client: - :param field_name - :param query_name: name of query - :return: - """ - if field_value and field_name: - d = { - "wildcard": { - field_name: field_value - } - } - q = query.Q(d) - client = client.query(q) - - return self._get_search_result(page_index=page_index, - client=client, - model=Table) - @timer_with_counter def fetch_table_search_results(self, *, query_term: str, @@ -341,14 +218,23 @@ def get_model_by_index(index: str) -> Any: return Table elif index == USER_INDEX: return User + elif index == DASHBOARD_INDEX: + return Dashboard raise Exception('Unable to map given index to a valid model') @staticmethod - def parse_filters(filter_list: Dict) -> str: + def parse_filters(filter_list: Dict, + index: str) -> str: query_list = [] # type: List[str] + if index == TABLE_INDEX: + mapping = TABLE_MAPPING + elif index == DASHBOARD_INDEX: + mapping = DASHBOARD_MAPPING + else: + raise Exception(f'index {index} doesnt exist nor support search filter') for category, item_list in filter_list.items(): - mapped_category = TABLE_MAPPING.get(category) + mapped_category = mapping.get(category) if mapped_category is None: LOGGING.warn(f'Unsupported filter category: {category} passed in list of filters') elif item_list is '' or item_list == ['']: @@ -376,19 +262,33 @@ def validate_filter_values(search_request: dict) -> Any: return True @staticmethod - def parse_query_term(query_term: str) -> str: + def parse_query_term(query_term: str, + index: str) -> str: # TODO: Might be some issue with using wildcard & underscore # https://discuss.elastic.co/t/wildcard-search-with-underscore-is-giving-no-result/114010/8 - return f'(name:(*{query_term}*) OR name:({query_term}) ' \ - f'OR schema:(*{query_term}*) OR schema:({query_term}) ' \ - f'OR description:(*{query_term}*) OR description:({query_term}) ' \ - f'OR column_names:(*{query_term}*) OR column_names:({query_term}) ' \ - f'OR column_descriptions:(*{query_term}*) OR column_descriptions:({query_term}))' + if index == TABLE_INDEX: + query_term = f'(name:(*{query_term}*) OR name:({query_term}) ' \ + f'OR schema:(*{query_term}*) OR schema:({query_term}) ' \ + f'OR description:(*{query_term}*) OR description:({query_term}) ' \ + f'OR column_names:(*{query_term}*) OR column_names:({query_term}) ' \ + f'OR column_descriptions:(*{query_term}*) OR column_descriptions:({query_term}))' + elif index == DASHBOARD_INDEX: + query_term = f'(name:(*{query_term}*) OR name:({query_term}) ' \ + f'OR group_name:(*{query_term}*) OR group_name:({query_term}) ' \ + f'OR query_names:(*{query_term}*) OR query_names:({query_term}) ' \ + f'OR description:(*{query_term}*) OR description:({query_term}) ' \ + f'OR tags:(*{query_term}*) OR tags:({query_term}) ' \ + f'OR badges:(*{query_term}*) OR badges:({query_term}) ' \ + f'OR product:(*{query_term}*) OR product:({query_term}))' + else: + raise Exception(f'index {index} doesnt exist nor support search filter') + return query_term @classmethod def convert_query_json_to_query_dsl(self, *, search_request: dict, - query_term: str) -> str: + query_term: str, + index: str) -> str: """ Convert the generic query json to query DSL e.g @@ -415,6 +315,8 @@ def convert_query_json_to_query_dsl(self, *, ``` :param search_request: + :param query_term: + :param index: table_index, dashboard_index :return: The search engine query DSL """ filter_list = search_request.get('filters') @@ -425,10 +327,12 @@ def convert_query_json_to_query_dsl(self, *, if valid_filters is False: raise Exception( 'The search filters contain invalid characters and thus cannot be handled by ES') - query_dsl = self.parse_filters(filter_list) + query_dsl = self.parse_filters(filter_list, + index) if query_term: - add_query = self.parse_query_term(query_term) + add_query = self.parse_query_term(query_term, + index) if not query_dsl and not add_query: raise Exception('Unable to convert parameters to valid query dsl') @@ -444,11 +348,12 @@ def convert_query_json_to_query_dsl(self, *, return result @timer_with_counter - def fetch_table_search_results_with_filter(self, *, - query_term: str, - search_request: dict, - page_index: int = 0, - index: str = '') -> SearchTableResult: + def fetch_search_results_with_filter(self, *, + query_term: str, + search_request: dict, + page_index: int = 0, + index: str = '') -> Union[SearchDashboardResult, + SearchTableResult]: """ Query Elasticsearch and return results as list of Table objects :param search_request: A json representation of search request @@ -458,17 +363,25 @@ def fetch_table_search_results_with_filter(self, *, """ current_index = index if index else \ current_app.config.get(config.ELASTICSEARCH_INDEX_KEY, DEFAULT_ES_INDEX) # type: str + if current_index == DASHBOARD_INDEX: + search_model = SearchDashboardResult # type: Any + elif current_index == TABLE_INDEX: + search_model = SearchTableResult + else: + raise RuntimeError(f'the {index} doesnt have search filter support') if not search_request: # return empty result for blank query term - return SearchTableResult(total_results=0, results=[]) + return search_model(total_results=0, results=[]) try: query_string = self.convert_query_json_to_query_dsl(search_request=search_request, - query_term=query_term) # type: str + query_term=query_term, + index=current_index) # type: str except Exception as e: LOGGING.exception(e) # return nothing if any exception is thrown under the hood - return SearchTableResult(total_results=0, results=[]) + return search_model(total_results=0, results=[]) + s = Search(using=self.elasticsearch, index=current_index) query_name = { @@ -490,7 +403,7 @@ def fetch_table_search_results_with_filter(self, *, client=s, query_name=query_name, model=model, - search_result_model=SearchTableResult) + search_result_model=search_model) @timer_with_counter def fetch_user_search_results(self, *, @@ -529,46 +442,6 @@ def fetch_user_search_results(self, *, query_name=query_name, model=User) - @timer_with_counter - def create_document(self, *, data: List[Table], index: str) -> str: - """ - Creates new index in elasticsearch, then routes traffic to the new index - instead of the old one - :return: str - """ - - if not index: - raise Exception('Index cant be empty for creating document') - if not data: - LOGGING.warn('Received no data to upload to Elasticsearch') - return '' - - return self._create_document_helper(data=data, index=index) - - @timer_with_counter - def update_document(self, *, data: List[Table], index: str) -> str: - """ - Updates the existing index in elasticsearch - :return: str - """ - if not index: - raise Exception('Index cant be empty for updating document') - if not data: - LOGGING.warn('Received no data to upload to Elasticsearch') - return '' - - return self._update_document_helper(data=data, index=index) - - @timer_with_counter - def delete_document(self, *, data: List[str], index: str) -> str: - if not index: - raise Exception('Index cant be empty for deleting document') - if not data: - LOGGING.warn('Received no data to upload to Elasticsearch') - return '' - - return self._delete_document_helper(data=data, index=index) - @timer_with_counter def fetch_dashboard_search_results(self, *, query_term: str, @@ -615,3 +488,147 @@ def fetch_dashboard_search_results(self, *, query_name=query_name, model=Dashboard, search_result_model=SearchDashboardResult) + + # The following methods are related to document API that needs to update + @timer_with_counter + def create_document(self, *, data: List[Table], index: str) -> str: + """ + Creates new index in elasticsearch, then routes traffic to the new index + instead of the old one + :return: str + """ + + if not index: + raise Exception('Index cant be empty for creating document') + if not data: + LOGGING.warn('Received no data to upload to Elasticsearch') + return '' + + return self._create_document_helper(data=data, index=index) + + @timer_with_counter + def update_document(self, *, data: List[Table], index: str) -> str: + """ + Updates the existing index in elasticsearch + :return: str + """ + if not index: + raise Exception('Index cant be empty for updating document') + if not data: + LOGGING.warn('Received no data to upload to Elasticsearch') + return '' + + return self._update_document_helper(data=data, index=index) + + @timer_with_counter + def delete_document(self, *, data: List[str], index: str) -> str: + if not index: + raise Exception('Index cant be empty for deleting document') + if not data: + LOGGING.warn('Received no data to upload to Elasticsearch') + return '' + + return self._delete_document_helper(data=data, index=index) + + def _create_document_helper(self, data: List[Table], index: str) -> str: + # fetch indices that use our chosen alias (should only ever return one in a list) + indices = self._fetch_old_index(index) + + for i in indices: + # build a list of elasticsearch actions for bulk upload + actions = self._build_index_actions(data=data, index_key=i) + + # bulk create or update data + self._bulk_helper(actions) + + return index + + def _update_document_helper(self, data: List[Table], index: str) -> str: + # fetch indices that use our chosen alias (should only ever return one in a list) + indices = self._fetch_old_index(index) + + for i in indices: + # build a list of elasticsearch actions for bulk update + actions = self._build_update_actions(data=data, index_key=i) + + # bulk update existing documents in index + self._bulk_helper(actions) + + return index + + def _delete_document_helper(self, data: List[str], index: str) -> str: + # fetch indices that use our chosen alias + indices = self._fetch_old_index(index) + + # set the document type + type = User.get_type() if index is USER_INDEX else Table.get_type() + + for i in indices: + # build a list of elasticsearch actions for bulk deletion + actions = self._build_delete_actions(data=data, index_key=i, type=type) + + # bulk delete documents in index + self._bulk_helper(actions) + + return index + + def _build_index_actions(self, data: List[Table], index_key: str) -> List[Dict[str, Any]]: + actions = list() + for item in data: + index_action = {'index': {'_index': index_key, '_type': item.get_type(), '_id': item.get_id()}} + actions.append(index_action) + actions.append(item.__dict__) + return actions + + def _build_update_actions(self, data: List[Table], index_key: str) -> List[Dict[str, Any]]: + actions = list() + + for item in data: + actions.append({'update': {'_index': index_key, '_type': item.get_type(), '_id': item.get_id()}}) + actions.append({'doc': item.__dict__}) + return actions + + def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> List[Dict[str, Any]]: + return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data] + + def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None: + result = self.elasticsearch.bulk(actions) + + if result['errors']: + # ES's error messages are nested within elasticsearch objects and can + # fail silently if you aren't careful + LOGGING.error('Error during Elasticsearch bulk actions') + LOGGING.debug(result['items']) + return + + def _fetch_old_index(self, alias: str) -> List[str]: + """ + Retrieve all indices that are currently tied to alias + (Can most often expect only one index to be returned in this list) + :return: list of elasticsearch indices + """ + try: + indices = self.elasticsearch.indices.get_alias(alias).keys() + return indices + except NotFoundError: + LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True) + + # create a new index if there isn't already one that is usable + new_index = self._create_index_helper(alias=alias) + return [new_index] + + def _create_index_helper(self, alias: str) -> str: + def _get_mapping(alias: str) -> str: + if alias is USER_INDEX: + return USER_INDEX_MAP + elif alias is TABLE_INDEX: + return TABLE_INDEX_MAP + return '' + index_key = str(uuid.uuid4()) + mapping: str = _get_mapping(alias=alias) + self.elasticsearch.indices.create(index=index_key, body=mapping) + + # alias our new index + index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]} + self.elasticsearch.indices.update_aliases(index_actions) + return index_key diff --git a/setup.py b/setup.py index 03b47aba..5a6da958 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages -__version__ = '2.3.4' +__version__ = '2.4.0' requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt') with open(requirements_path) as requirements_file: diff --git a/tests/unit/api/dashboard/test_search_dashboard_filter.py b/tests/unit/api/dashboard/test_search_dashboard_filter.py new file mode 100644 index 00000000..506447a6 --- /dev/null +++ b/tests/unit/api/dashboard/test_search_dashboard_filter.py @@ -0,0 +1,61 @@ +import unittest + +from http import HTTPStatus +from mock import patch, MagicMock + +from search_service import create_app + + +class SearchDashboardFilterTest(unittest.TestCase): + def setUp(self) -> None: + self.app = create_app(config_module_class='search_service.config.LocalConfig') + self.app_context = self.app.app_context() + self.app_context.push() + self.mock_index = 'dashboard_search_index' + self.mock_term = 'test' + self.mock_page_index = 0 + self.mock_search_request = { + 'type': 'AND', + 'filters': { + 'product': ['mode'] + } + } + self.url = '/search_dashboard_filter' + + def tear_down(self) -> None: + self.app_context.pop() + + @patch('search_service.api.dashboard.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') + def test_post(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: + mock_proxy = get_proxy() + RequestParser().parse_args.return_value = dict(index=self.mock_index, + page_index=self.mock_page_index, + query_term=self.mock_term, + search_request=self.mock_search_request) + + self.app.test_client().post(self.url) + mock_proxy.fetch_search_results_with_filter.assert_called_with(index=self.mock_index, + page_index=self.mock_page_index, + query_term=self.mock_term, + search_request=self.mock_search_request) + + @patch('search_service.api.dashboard.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') + def test_post_return_400_if_no_search_request(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: + RequestParser().parse_args.return_value = dict(index=self.mock_index, + query_term=self.mock_term) + + response = self.app.test_client().post(self.url) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) + + @patch('search_service.api.dashboard.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') + def test_post_return_400_if_bad_query_term(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: + RequestParser().parse_args.return_value = dict(index=self.mock_index, + page_index=self.mock_page_index, + query_term='name:bad_syntax', + search_request=self.mock_search_request) + + response = self.app.test_client().post(self.url) + self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) diff --git a/tests/unit/api/table/test_search_table_filter.py b/tests/unit/api/table/test_search_table_filter.py index 7e12e8be..64b91194 100644 --- a/tests/unit/api/table/test_search_table_filter.py +++ b/tests/unit/api/table/test_search_table_filter.py @@ -11,7 +11,7 @@ def setUp(self) -> None: self.app = create_app(config_module_class='search_service.config.LocalConfig') self.app_context = self.app.app_context() self.app_context.push() - self.mock_index = 'fake_index' + self.mock_index = 'table_search_index' self.mock_term = 'test' self.mock_page_index = 0 self.mock_search_request = { @@ -25,8 +25,8 @@ def setUp(self) -> None: def tear_down(self) -> None: self.app_context.pop() - @patch('search_service.api.document.reqparse.RequestParser') - @patch('search_service.api.table.get_proxy_client') + @patch('search_service.api.table.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') def test_post(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: mock_proxy = get_proxy() RequestParser().parse_args.return_value = dict(index=self.mock_index, @@ -35,13 +35,13 @@ def test_post(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: search_request=self.mock_search_request) self.app.test_client().post(self.url) - mock_proxy.fetch_table_search_results_with_filter.assert_called_with(index=self.mock_index, - page_index=self.mock_page_index, - query_term=self.mock_term, - search_request=self.mock_search_request) + mock_proxy.fetch_search_results_with_filter.assert_called_with(index=self.mock_index, + page_index=self.mock_page_index, + query_term=self.mock_term, + search_request=self.mock_search_request) - @patch('search_service.api.document.reqparse.RequestParser') - @patch('search_service.api.table.get_proxy_client') + @patch('search_service.api.table.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') def test_post_return_400_if_no_search_request(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: RequestParser().parse_args.return_value = dict(index=self.mock_index, query_term=self.mock_term) @@ -49,8 +49,8 @@ def test_post_return_400_if_no_search_request(self, get_proxy: MagicMock, Reques response = self.app.test_client().post(self.url) self.assertEqual(response.status_code, HTTPStatus.BAD_REQUEST) - @patch('search_service.api.document.reqparse.RequestParser') - @patch('search_service.api.table.get_proxy_client') + @patch('search_service.api.table.reqparse.RequestParser') + @patch('search_service.api.base.get_proxy_client') def test_post_return_400_if_bad_query_term(self, get_proxy: MagicMock, RequestParser: MagicMock) -> None: RequestParser().parse_args.return_value = dict(index=self.mock_index, page_index=self.mock_page_index, diff --git a/tests/unit/proxy/test_elasticsearch.py b/tests/unit/proxy/test_elasticsearch.py index 2cb3e3ff..25e4f1a6 100644 --- a/tests/unit/proxy/test_elasticsearch.py +++ b/tests/unit/proxy/test_elasticsearch.py @@ -315,14 +315,14 @@ def test_search_table_filter(self, mock_search: MagicMock) -> None: 'tag': ['test-tag'], } } - resp = self.es_proxy.fetch_table_search_results_with_filter(search_request=search_request, query_term='test') + resp = self.es_proxy.fetch_search_results_with_filter(search_request=search_request, query_term='test') self.assertEquals(resp.total_results, expected.total_results) self.assertIsInstance(resp.results[0], Table) self.assertDictEqual(vars(resp.results[0]), vars(expected.results[0])) def test_search_table_filter_return_no_results_if_no_search_request(self) -> None: - resp = self.es_proxy.fetch_table_search_results_with_filter(search_request=None, query_term='test') + resp = self.es_proxy.fetch_search_results_with_filter(search_request=None, query_term='test') self.assertEquals(resp.total_results, 0) self.assertEquals(resp.results, []) @@ -334,8 +334,8 @@ def test_search_table_filter_return_no_results_if_dsl_conversion_error(self) -> } with patch.object(self.es_proxy, 'convert_query_json_to_query_dsl') as mock: mock.side_effect = MagicMock(side_effect=Exception('Test')) - resp = self.es_proxy.fetch_table_search_results_with_filter(search_request=search_request, - query_term='test') + resp = self.es_proxy.fetch_search_results_with_filter(search_request=search_request, + query_term='test') self.assertEquals(resp.total_results, 0) self.assertEquals(resp.results, []) @@ -362,13 +362,15 @@ def test_parse_filters_return_results(self) -> None: "AND name.raw:(*amundsen*) " \ "AND column_names.raw:(*ds*) " \ "AND tags:(test-tag)" - self.assertEquals(self.es_proxy.parse_filters(filter_list), expected_result) + self.assertEquals(self.es_proxy.parse_filters(filter_list, + index=TABLE_INDEX), expected_result) def test_parse_filters_return_no_results(self) -> None: filter_list = { 'unsupported_category': ['fake'] } - self.assertEquals(self.es_proxy.parse_filters(filter_list), '') + self.assertEquals(self.es_proxy.parse_filters(filter_list, + index=TABLE_INDEX), '') def test_validate_wrong_filters_values(self) -> None: search_request = { @@ -400,7 +402,8 @@ def test_parse_query_term(self) -> None: "schema:(test) OR description:(*test*) OR description:(test) OR " \ "column_names:(*test*) OR column_names:(test) OR " \ "column_descriptions:(*test*) OR column_descriptions:(test))" - self.assertEquals(self.es_proxy.parse_query_term(term), expected_result) + self.assertEquals(self.es_proxy.parse_query_term(term, + index=TABLE_INDEX), expected_result) def test_convert_query_json_to_query_dsl_term_and_filters(self) -> None: term = 'test' @@ -416,10 +419,11 @@ def test_convert_query_json_to_query_dsl_term_and_filters(self) -> None: 'filters': test_filters } - expected_result = self.es_proxy.parse_filters(test_filters) + " AND " + \ - self.es_proxy.parse_query_term(term) + expected_result = self.es_proxy.parse_filters(test_filters, index=TABLE_INDEX) + " AND " + \ + self.es_proxy.parse_query_term(term, index=TABLE_INDEX) ret_result = self.es_proxy.convert_query_json_to_query_dsl(search_request=search_request, - query_term=term) + query_term=term, + index=TABLE_INDEX) self.assertEquals(ret_result, expected_result) def test_convert_query_json_to_query_dsl_no_term(self) -> None: @@ -431,9 +435,11 @@ def test_convert_query_json_to_query_dsl_no_term(self) -> None: 'type': 'AND', 'filters': test_filters } - expected_result = self.es_proxy.parse_filters(test_filters) + expected_result = self.es_proxy.parse_filters(test_filters, + index=TABLE_INDEX) ret_result = self.es_proxy.convert_query_json_to_query_dsl(search_request=search_request, - query_term=term) + query_term=term, + index=TABLE_INDEX) self.assertEquals(ret_result, expected_result) def test_convert_query_json_to_query_dsl_no_filters(self) -> None: @@ -442,9 +448,11 @@ def test_convert_query_json_to_query_dsl_no_filters(self) -> None: 'type': 'AND', 'filters': {} } - expected_result = self.es_proxy.parse_query_term(term) + expected_result = self.es_proxy.parse_query_term(term, + index=TABLE_INDEX) ret_result = self.es_proxy.convert_query_json_to_query_dsl(search_request=search_request, - query_term=term) + query_term=term, + index=TABLE_INDEX) self.assertEquals(ret_result, expected_result) def test_convert_query_json_to_query_dsl_raise_exception_no_term_or_filters(self) -> None: