Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Add comments to pagination and dead link mask and validation #870

Merged
merged 1 commit into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions api/catalog/api/controllers/search_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,37 @@ def _paginate_with_dead_link_mask(
start = len(query_mask)
end = ceil(page_size * page / (1 - DEAD_LINK_RATIO))
else:
# query_mask is a list of 0 and 1 where 0 indicates the result position
# for the given query will be an invalid link. If we accumulate a query
# mask you end up, at each index, with the number of live results you
# will get back when you query that deeply.
# We then query for the start and end index _of the results_ in ES based
# on the number of results that we think will be valid based on the query mask.
# If we're requesting `page=2 page_size=3` and the mask is [0, 1, 0, 1, 0, 1],
# then we know that we have to _start_ with at least the sixth result of the
# overall query to skip the first page of 3 valid results. The "end" of the
# query will then follow the same pattern to reach the number of valid results
# required to fill the requested page. If the mask is not deep enough to
# account for the entire range, then we follow the typical assumption when
# a mask is not available that the end should be `page * page_size / 0.5`
# (i.e., double the page size)
accu_query_mask = list(accumulate(query_mask))
start = 0
if page > 1:
try:
# find the index at which we can skip N valid results where N = all
# the results that would be skipped to arrive at the start of the
# requested page
# This will effectively be the index at which we have the number of
# previous valid results + 1 because we don't want to include the
# last valid result from the previous page
start = accu_query_mask.index(page_size * (page - 1) + 1)
except ValueError:
# Cannot fail because of the check on line 56 which verifies that
# the query mask already includes at least enough masked valid
# results to fulfill the requested page size
start = accu_query_mask.index(page_size * (page - 1)) + 1

if page_size * page > sum(query_mask):
end = ceil(page_size * page / (1 - DEAD_LINK_RATIO))
else:
Expand Down Expand Up @@ -112,6 +136,8 @@ def _post_process_results(
results, perform image validation, and route certain thumbnails through our
proxy.

Keeps making new query requests until it is able to fill the page size.

:param s: The Elasticsearch Search object.
:param start: The start of the result slice.
:param end: The end of the result slice.
Expand Down Expand Up @@ -139,6 +165,27 @@ def _post_process_results(
return None

if len(results) < page_size:
"""
The variables in this function get updated in an interesting way.
Here is an example of that for a typical query. Note that ``end``
increases but start stays the same. This has the effect of slowly
increasing the size of the query we send to Elasticsearch with the
goal of backfilling the results until we have enough valid (live)
results to fulfill the requested page size.

```
page_size: 20
page: 1

start: 0
end: 40 (DEAD_LINK_RATIO applied)

end gets updated to end + end/2 = 60

end = 90
end = 90 + 45
sarayourfriend marked this conversation as resolved.
Show resolved Hide resolved
```
"""
end += int(end / 2)
if start + end > ELASTICSEARCH_MAX_RESULT_WINDOW:
return results
Expand All @@ -149,6 +196,7 @@ def _post_process_results(
return _post_process_results(
s, start, end, page_size, search_response, request, filter_dead
)

return results[:page_size]


Expand Down Expand Up @@ -331,6 +379,7 @@ def search(
log.info(pprint.pprint(search_response.to_dict()))
except RequestError as e:
raise ValueError(e)

results = _post_process_results(
s, start, end, page_size, search_response, request, filter_dead
)
Expand Down
13 changes: 13 additions & 0 deletions api/catalog/api/utils/validate_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ def validate_images(query_hash, start_slice, results, image_urls):

logger.debug("starting validation")
start_time = time.time()

# Pull matching images from the cache.
redis = django_redis.get_redis_connection("default")
cached_statuses = _get_cached_statuses(redis, image_urls)
logger.debug(f"len(cached_statuses)={len(cached_statuses)}")

# Anything that isn't in the cache needs to be validated via HEAD request.
to_verify = {}
for idx, url in enumerate(image_urls):
if cached_statuses[idx] is None:
to_verify[url] = idx
logger.debug(f"len(to_verify)={len(to_verify)}")

reqs = (
grequests.head(
url, headers=HEADERS, allow_redirects=False, timeout=2, verify=False
Expand All @@ -63,6 +66,8 @@ def validate_images(query_hash, start_slice, results, image_urls):
verified = grequests.map(reqs, exception_handler=_validation_failure)
# Cache newly verified image statuses.
to_cache = {}
# relies on the consistenct of the order returned by `dict::keys()` which
# is safe as of Python 3 dot something
for idx, url in enumerate(to_verify.keys()):
cache_key = CACHE_PREFIX + url
if verified[idx]:
Expand All @@ -74,9 +79,11 @@ def validate_images(query_hash, start_slice, results, image_urls):

thirty_minutes = 60 * 30
twenty_four_hours_seconds = 60 * 60 * 24

pipe = redis.pipeline()
if len(to_cache) > 0:
pipe.mset(to_cache)

for key, status in to_cache.items():
# Cache successful links for a day, and broken links for 120 days.
if status == 200:
Expand Down Expand Up @@ -104,6 +111,7 @@ def validate_images(query_hash, start_slice, results, image_urls):

# Create a new dead link mask
new_mask = [1] * len(results)

# Delete broken images from the search results response.
for idx, _ in enumerate(cached_statuses):
del_idx = len(cached_statuses) - idx - 1
Expand All @@ -120,12 +128,17 @@ def validate_images(query_hash, start_slice, results, image_urls):
f"id={results[del_idx]['identifier']} "
f"status={status} "
)
# remove the result, mutating in place
del results[del_idx]
# update the result's position in the mask to indicate it is dead
new_mask[del_idx] = 0

# Merge and cache the new mask
mask = get_query_mask(query_hash)
if mask:
# skip the leading part of the mask that represents results that come before
# the results we've verified this time around. Overwrite everything after
# with our new results validation mask.
new_mask = mask[:start_slice] + new_mask
save_query_mask(query_hash, new_mask)

Expand Down