From 75a18a3ead61380d1f182a85ed59636a2789734f Mon Sep 17 00:00:00 2001 From: sarayourfriend <24264157+sarayourfriend@users.noreply.github.com> Date: Tue, 9 Aug 2022 14:00:35 -0400 Subject: [PATCH] Add comments to pagination and dead link mask and validation --- .../api/controllers/search_controller.py | 49 +++++++++++++++++++ api/catalog/api/utils/validate_images.py | 13 +++++ 2 files changed, 62 insertions(+) diff --git a/api/catalog/api/controllers/search_controller.py b/api/catalog/api/controllers/search_controller.py index da7981431..e48d60fb8 100644 --- a/api/catalog/api/controllers/search_controller.py +++ b/api/catalog/api/controllers/search_controller.py @@ -61,13 +61,37 @@ def _paginate_with_dead_link_mask( start = len(query_mask) end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) else: + # query_mask is a list of 0 and 1 where 0 indicates the result position + # for the given query will be an invalid link. If we accumulate a query + # mask you end up, at each index, with the number of live results you + # will get back when you query that deeply. + # We then query for the start and end index _of the results_ in ES based + # on the number of results that we think will be valid based on the query mask. + # If we're requesting `page=2 page_size=3` and the mask is [0, 1, 0, 1, 0, 1], + # then we know that we have to _start_ with at least the sixth result of the + # overall query to skip the first page of 3 valid results. The "end" of the + # query will then follow the same pattern to reach the number of valid results + # required to fill the requested page. If the mask is not deep enough to + # account for the entire range, then we follow the typical assumption when + # a mask is not available that the end should be `page * page_size / 0.5` + # (i.e., double the page size) accu_query_mask = list(accumulate(query_mask)) start = 0 if page > 1: try: + # find the index at which we can skip N valid results where N = all + # the results that would be skipped to arrive at the start of the + # requested page + # This will effectively be the index at which we have the number of + # previous valid results + 1 because we don't want to include the + # last valid result from the previous page start = accu_query_mask.index(page_size * (page - 1) + 1) except ValueError: + # Cannot fail because of the check on line 56 which verifies that + # the query mask already includes at least enough masked valid + # results to fulfill the requested page size start = accu_query_mask.index(page_size * (page - 1)) + 1 + if page_size * page > sum(query_mask): end = ceil(page_size * page / (1 - DEAD_LINK_RATIO)) else: @@ -112,6 +136,8 @@ def _post_process_results( results, perform image validation, and route certain thumbnails through our proxy. + Keeps making new query requests until it is able to fill the page size. + :param s: The Elasticsearch Search object. :param start: The start of the result slice. :param end: The end of the result slice. @@ -139,6 +165,27 @@ def _post_process_results( return None if len(results) < page_size: + """ + The variables in this function get updated in an interesting way. + Here is an example of that for a typical query. Note that ``end`` + increases but start stays the same. This has the effect of slowly + increasing the size of the query we send to Elasticsearch with the + goal of backfilling the results until we have enough valid (live) + results to fulfill the requested page size. + + ``` + page_size: 20 + page: 1 + + start: 0 + end: 40 (DEAD_LINK_RATIO applied) + + end gets updated to end + end/2 = 60 + + end = 90 + end = 90 + 45 + ``` + """ end += int(end / 2) if start + end > ELASTICSEARCH_MAX_RESULT_WINDOW: return results @@ -149,6 +196,7 @@ def _post_process_results( return _post_process_results( s, start, end, page_size, search_response, request, filter_dead ) + return results[:page_size] @@ -331,6 +379,7 @@ def search( log.info(pprint.pprint(search_response.to_dict())) except RequestError as e: raise ValueError(e) + results = _post_process_results( s, start, end, page_size, search_response, request, filter_dead ) diff --git a/api/catalog/api/utils/validate_images.py b/api/catalog/api/utils/validate_images.py index 24756273e..123627d78 100644 --- a/api/catalog/api/utils/validate_images.py +++ b/api/catalog/api/utils/validate_images.py @@ -44,16 +44,19 @@ def validate_images(query_hash, start_slice, results, image_urls): logger.debug("starting validation") start_time = time.time() + # Pull matching images from the cache. redis = django_redis.get_redis_connection("default") cached_statuses = _get_cached_statuses(redis, image_urls) logger.debug(f"len(cached_statuses)={len(cached_statuses)}") + # Anything that isn't in the cache needs to be validated via HEAD request. to_verify = {} for idx, url in enumerate(image_urls): if cached_statuses[idx] is None: to_verify[url] = idx logger.debug(f"len(to_verify)={len(to_verify)}") + reqs = ( grequests.head( url, headers=HEADERS, allow_redirects=False, timeout=2, verify=False @@ -63,6 +66,8 @@ def validate_images(query_hash, start_slice, results, image_urls): verified = grequests.map(reqs, exception_handler=_validation_failure) # Cache newly verified image statuses. to_cache = {} + # relies on the consistenct of the order returned by `dict::keys()` which + # is safe as of Python 3 dot something for idx, url in enumerate(to_verify.keys()): cache_key = CACHE_PREFIX + url if verified[idx]: @@ -74,9 +79,11 @@ def validate_images(query_hash, start_slice, results, image_urls): thirty_minutes = 60 * 30 twenty_four_hours_seconds = 60 * 60 * 24 + pipe = redis.pipeline() if len(to_cache) > 0: pipe.mset(to_cache) + for key, status in to_cache.items(): # Cache successful links for a day, and broken links for 120 days. if status == 200: @@ -104,6 +111,7 @@ def validate_images(query_hash, start_slice, results, image_urls): # Create a new dead link mask new_mask = [1] * len(results) + # Delete broken images from the search results response. for idx, _ in enumerate(cached_statuses): del_idx = len(cached_statuses) - idx - 1 @@ -120,12 +128,17 @@ def validate_images(query_hash, start_slice, results, image_urls): f"id={results[del_idx]['identifier']} " f"status={status} " ) + # remove the result, mutating in place del results[del_idx] + # update the result's position in the mask to indicate it is dead new_mask[del_idx] = 0 # Merge and cache the new mask mask = get_query_mask(query_hash) if mask: + # skip the leading part of the mask that represents results that come before + # the results we've verified this time around. Overwrite everything after + # with our new results validation mask. new_mask = mask[:start_slice] + new_mask save_query_mask(query_hash, new_mask)