diff --git a/api/catalog/api/controllers/elasticsearch/related.py b/api/catalog/api/controllers/elasticsearch/related.py index e3b0decc6..df28513b6 100644 --- a/api/catalog/api/controllers/elasticsearch/related.py +++ b/api/catalog/api/controllers/elasticsearch/related.py @@ -1,6 +1,3 @@ -import json -import logging - from elasticsearch_dsl import Search from catalog.api.controllers.elasticsearch.utils import ( @@ -11,14 +8,10 @@ ) -parent_logger = logging.getLogger(__name__) - - def related_media(uuid, index, filter_dead): """ Given a UUID, find related search results. """ - logger = parent_logger.getChild("related_media") search_client = Search(using="default", index=index) # Convert UUID to sequential ID. @@ -40,20 +33,8 @@ def related_media(uuid, index, filter_dead): start, end = get_query_slice(s, page_size, page, filter_dead) s = s[start:end] response = s.execute() - logger.debug( - "executed related query " - f"es_took_ms={response.took} " - f"query={json.dumps(s.to_dict())} " - f"response={json.dumps(response.to_dict())} " - ) - results = post_process_results(s, start, end, page_size, response, filter_dead) result_count, _ = get_result_and_page_count(response, results, page_size) - logger.debug( - "finished post processing and returning related " - f"result_count={result_count} " - f"results={json.dumps(results)}" - ) return results, result_count diff --git a/api/catalog/api/controllers/elasticsearch/search.py b/api/catalog/api/controllers/elasticsearch/search.py index d3e009c23..925d5ba4b 100644 --- a/api/catalog/api/controllers/elasticsearch/search.py +++ b/api/catalog/api/controllers/elasticsearch/search.py @@ -1,8 +1,9 @@ from __future__ import annotations import json -import logging -from typing import List, Literal, Optional, Tuple +import logging as log +import pprint +from typing import List, Literal, Tuple from django.conf import settings @@ -20,32 +21,19 @@ from catalog.api.serializers.media_serializers import MediaSearchRequestSerializer -parent_logger = logging.getLogger(__name__) - - class FieldMapping: """ Establishes a mapping between a field in ``MediaSearchRequestSerializer`` and the Elasticsearch index for a media. """ - def __init__(self, serializer_field: str, es_field: Optional[str] = None): + def __init__(self, serializer_field: str, es_field: str = None): self.serializer_field: str = serializer_field """the name of the field in ``MediaSearchRequestSerializer``""" self.es_field: str = es_field or serializer_field """the name of the field in the Elasticsearch index""" - def __str__(self): - return ( - "FieldMapping(" - f"serializer_field={self.serializer_field}, " - f"es_field={self.es_field})" - ) - - def __repr__(self): - return str(self) - def _quote_escape(query_string: str) -> str: """ @@ -81,19 +69,13 @@ def _apply_filter( :param behaviour: whether to accept (``filter``) or reject (``exclude``) the hit :return: the modified search query """ - logger = parent_logger.getChild("_apply_filter") + search_params = query_serializer.data if mapping.serializer_field in search_params: filters = [] for arg in search_params[mapping.serializer_field].split(","): filters.append(Q("term", **{mapping.es_field: arg})) method = getattr(s, behaviour) # can be ``s.filter`` or ``s.exclude`` - logger.debug( - "applying filter " - f"behaviour={behaviour} " - f"mapping={mapping} " - f"filters={json.dumps(list(map(lambda f: f.to_dict(), filters)))} " - ) return method("bool", should=filters) else: return s @@ -114,13 +96,6 @@ def perform_search( :return: the list of search results with the page and result count """ - logger = parent_logger.getChild("perform_search") - logger.info( - "searching with " - f"query_serializer.data={json.dumps(query_serializer.data)} " - f"index={index} " - f"(hashed) ip={ip} " - ) s = Search(using="default", index=index) search_params = query_serializer.data @@ -146,7 +121,6 @@ def perform_search( # Exclude mature content if not search_params["mature"]: - logger.debug("excluding mature") s = s.exclude("term", mature=True) # Exclude sources with ``filter_content`` enabled s = exclude_filtered_providers(s) @@ -156,16 +130,15 @@ def perform_search( search_fields = ["tags.name", "title", "description"] if "q" in search_params: - escaped_query = _quote_escape(search_params["q"]) - logger.info(f"searching with query term escaped_query={escaped_query}") + query = _quote_escape(search_params["q"]) s = s.query( "simple_query_string", - query=escaped_query, + query=query, fields=search_fields, default_operator="AND", ) # Boost exact matches - quotes_stripped = escaped_query.replace('"', "") + quotes_stripped = query.replace('"', "") exact_match_boost = Q( "simple_query_string", fields=["title"], @@ -174,7 +147,6 @@ def perform_search( ) s.query = Q("bool", must=s.query, should=exact_match_boost) else: - logger.info("searching without query term") query_bases = ["creator", "title", ("tags", "tags.name")] for query_basis in query_bases: if isinstance(query_basis, tuple): @@ -183,21 +155,12 @@ def perform_search( serializer_field = es_field = query_basis if serializer_field in search_params: value = _quote_escape(search_params[serializer_field]) - logger.debug( - "adding query for " - f"value={value} " - f"es_field={es_field}" - f"serializer_field={serializer_field}" - ) s = s.query("simple_query_string", fields=[es_field], query=value) if settings.USE_RANK_FEATURES: feature_boost = {"standardized_popularity": 10000} rank_queries = [] for field, boost in feature_boost.items(): - logger.debug( - "applying ranked features " f"field={field} " f"boost={boost} " - ) rank_queries.append(Q("rank_feature", field=field, boost=boost)) s.query = Q("bool", must=s.query or EMPTY_QUERY, should=rank_queries) @@ -220,16 +183,15 @@ def perform_search( s = s[start:end] try: - logger.info("executing query") + if settings.VERBOSE_ES_RESPONSE: + log.info(pprint.pprint(s.to_dict())) search_response = s.execute() - logger.debug( - "executed query " - f"es_took_ms={search_response.took} " - f"query={json.dumps(s.to_dict())} " - f"response={json.dumps(search_response.to_dict())} " + log.info( + f"query={json.dumps(s.to_dict())}," f" es_took_ms={search_response.took}" ) + if settings.VERBOSE_ES_RESPONSE: + log.info(pprint.pprint(search_response.to_dict())) except RequestError as e: - logger.error("encountered error executing query", exc_info=True) raise ValueError(e) results = post_process_results( @@ -244,17 +206,4 @@ def perform_search( result_count, page_count = get_result_and_page_count( search_response, results, search_params["page_size"] ) - - dumpable_results = ( - results.to_dict() - if isinstance(results, Hit) - else list(map(lambda r: r.to_dict(), results)) - ) - - logger.debug( - "finished post processing and returning " - f"result_count={result_count} " - f"page_count={page_count} " - f"results={json.dumps(dumpable_results)}" - ) return results, page_count, result_count diff --git a/api/catalog/api/controllers/elasticsearch/stats.py b/api/catalog/api/controllers/elasticsearch/stats.py index bd7a163bc..f37e54812 100644 --- a/api/catalog/api/controllers/elasticsearch/stats.py +++ b/api/catalog/api/controllers/elasticsearch/stats.py @@ -1,5 +1,4 @@ -import json -import logging +import logging as log from typing import Literal from django.core.cache import cache @@ -8,9 +7,6 @@ from elasticsearch_dsl import Search -parent_logger = logging.getLogger(__name__) - - SOURCE_CACHE_TIMEOUT = 60 * 20 # seconds @@ -22,21 +18,14 @@ def get_stats(index: Literal["image", "audio"]): :param index: the Elasticsearch index name :return: a dictionary mapping sources to the count of their media items """ - logger = parent_logger.getChild("get_stats") + source_cache_name = "sources-" + index try: - logger.debug(f"fetching source cache key={source_cache_name}") sources = cache.get(key=source_cache_name) if sources is not None: - logger.debug(f"cache hit! returning sources={json.dumps(sources)}") return sources - else: - logger.debug("cache missed") except ValueError: - # TODO: Improve error handling here. - # What failed? Why? Do we need to address it? - # Is this a critical issue? Why is this a "warning"? - logger.warning("Source cache fetch failed") + log.warning("Source cache fetch failed") # Don't increase `size` without reading this issue first: # https://github.com/elastic/elasticsearch/issues/18838 @@ -57,8 +46,6 @@ def get_stats(index: Literal["image", "audio"]): sources = {} if sources: - logger.info(f"putting sources to cache key={source_cache_name}") cache.set(key=source_cache_name, timeout=SOURCE_CACHE_TIMEOUT, value=sources) - logger.debug(f"sources={json.dumps(sources)}") return sources diff --git a/api/catalog/api/controllers/elasticsearch/utils.py b/api/catalog/api/controllers/elasticsearch/utils.py index 5ba55f111..cb0bb6e12 100644 --- a/api/catalog/api/controllers/elasticsearch/utils.py +++ b/api/catalog/api/controllers/elasticsearch/utils.py @@ -1,5 +1,3 @@ -import json -import logging from itertools import accumulate from math import ceil from typing import List, Optional, Tuple @@ -14,9 +12,6 @@ from catalog.api.utils.validate_images import validate_images -parent_logger = logging.getLogger(__name__) - - FILTER_CACHE_TIMEOUT = 30 DEAD_LINK_RATIO = 1 / 2 ELASTICSEARCH_MAX_RESULT_WINDOW = 10000 @@ -30,24 +25,20 @@ def exclude_filtered_providers(s: Search) -> Search: :param s: the search query to issue to Elasticsearch :return: the modified search query """ - logger = parent_logger.getChild("exclude_filtered_providers") + filter_cache_key = "filtered_providers" filtered_providers = cache.get(key=filter_cache_key) if filtered_providers is None: filtered_providers = ContentProvider.objects.filter(filter_content=True).values( "provider_identifier" ) - logger.debug("adding filtered providers to cache") cache.set( key=filter_cache_key, timeout=FILTER_CACHE_TIMEOUT, value=filtered_providers, ) - - logger.info(f'filtered_providers={",".join(filtered_providers)}') if len(filtered_providers) != 0: to_exclude = [f["provider_identifier"] for f in filtered_providers] - logger.info("auto-excluding filtered providers") s = s.exclude("terms", provider=to_exclude) return s @@ -64,71 +55,26 @@ def paginate_with_dead_link_mask( :param page: The page number. :return: Tuple of start and end. """ - logger = parent_logger.getChild("paginate_with_dead_link_mask") query_hash = get_query_hash(s) query_mask = get_query_mask(query_hash) - logger.debug( - f"page_size={page_size} " - f"page={page} " - f"query_hash={query_hash} " - f'query_mask={",".join(map(str, query_mask))} ' - ) if not query_mask: start = 0 end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) - logger.debug( - "empty query_mask " - f"page_size={page_size} " - f"page={page} " - f"start={start} " - f"end={end} " - ) - return start, end - - # TODO: What does this request_size mean vs the one below - # that doesn't subtract 1 from the page? Why do we subtract - # 1 here and the other time we do not? - request_size = page_size * (page - 1) - query_mask_sum = sum(query_mask) - if request_size > query_mask_sum: + elif page_size * (page - 1) > sum(query_mask): start = len(query_mask) end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) - logger.debug( - "request size exceeds query mask sum (branch 1)" - f"request_size={request_size} " - f"sum(query_mask)={query_mask_sum} " - f"start={start} " - f"end={end} " - ) - return start, end - - accu_query_mask = list(accumulate(query_mask)) - logger.debug(f"accu_query_mask={accu_query_mask} ") - start = 0 - if page > 1: - caught = False - try: - idx = page_size * (page - 1) + 1 - start = accu_query_mask.index(idx) - except ValueError: - caught = True - idx = page_size * (page - 1) - start = accu_query_mask.index(idx) + 1 - - logger.debug("shifted page using " f"idx={idx} " f"caught?={caught} ") - - request_size = page_size * page - if request_size > query_mask_sum: - logger.debug( - "request size exceeds query mask sum (branch 2) " - f"request_size={request_size} " - f"query_mask_sum={query_mask_sum} " - ) - end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) else: - end = accu_query_mask.index(page_size * page) + 1 - - logger.debug("third branch returnining " f"start={start} " f"end={end} ") + accu_query_mask = list(accumulate(query_mask)) + start = 0 + if page > 1: + try: + start = accu_query_mask.index(page_size * (page - 1) + 1) + except ValueError: + start = accu_query_mask.index(page_size * (page - 1)) + 1 + if page_size * page > sum(query_mask): + end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) + else: + end = accu_query_mask.index(page_size * page) + 1 return start, end @@ -138,37 +84,24 @@ def get_query_slice( """ Select the start and end of the search results for this query. """ - logger = parent_logger.getChild("get_query_slice") if filter_dead: start_slice, end_slice = paginate_with_dead_link_mask(s, page_size, page) else: # Paginate search query. start_slice = page_size * (page - 1) end_slice = page_size * page - logger.debug( - "paginating without filtering for dead links " - f"page_size={page_size} " - f"page={page} " - f"start_slice={start_slice} " - f"end_slice={end_slice} " - ) if start_slice + end_slice > ELASTICSEARCH_MAX_RESULT_WINDOW: - raise ValueError( - "Deep pagination is not allowed. " - f"Window totalled {start_slice + end_slice}." - ) - - logger.info(f"start_slice={start_slice} end_slice={end_slice}") + raise ValueError("Deep pagination is not allowed.") return start_slice, end_slice def post_process_results( - s, start, end, page_size, search_results, filter_dead, depth: int = 0 + s, start, end, page_size, search_results, filter_dead ) -> List[Hit]: """ After fetching the search results from the back end, iterate through the - results filtering dead links. After filtering links, continue accruing additional - results until the requested page_size of validated results is reached. + results, perform image validation, and route certain thumbnails through our + proxy. :param s: The Elasticsearch Search object. :param start: The start of the result slice. @@ -178,8 +111,6 @@ def post_process_results( :param filter_dead: Whether images should be validated. :return: List of results. """ - logger = parent_logger.getChild("post_process_results") - logger.info(f"post processing results depth={depth}") results = [] to_validate = [] for res in search_results: @@ -198,20 +129,10 @@ def post_process_results( return results s = s[start:end] - logger.info(f"executing additional backfill query depth={depth}") search_response = s.execute() - # don't log the query off of ``s`` because it's identical to the already - # logged query elsewhere in the logs, just with different start/end. - logger.debug( - "exectued additional backfill query" - f"start={start} " - f"end={end} " - f"es_took_ms={search_response.took} " - f"response={json.dumps(search_response.to_dict())} " - ) return post_process_results( - s, start, end, page_size, search_response, filter_dead, depth=depth + 1 + s, start, end, page_size, search_response, filter_dead ) return results[:page_size] @@ -227,22 +148,13 @@ def get_result_and_page_count( :param results: The list of filtered result Hits. :return: Result and page count. """ - logger = parent_logger.getChild("get_result_and_page_count") result_count = response_obj.hits.total.value natural_page_count = int(result_count / page_size) if natural_page_count % page_size != 0: - logger.debug("incrementing natural_page_count") natural_page_count += 1 last_allowed_page = int((5000 + page_size / 2) / page_size) page_count = min(natural_page_count, last_allowed_page) if len(results) < page_size and page_count == 0: - logger.debug(f"setting result_count to len(results)={len(results)}") result_count = len(results) - logger.debug( - f"result_count={result_count} " - f"page_count={page_count} " - f"natural_page_count={natural_page_count} " - f"last_allowed_page={last_allowed_page}" - ) return result_count, page_count