diff --git a/api/test/api_live_integration.py b/api/test/api_live_integration.py deleted file mode 100644 index 22412f20301..00000000000 --- a/api/test/api_live_integration.py +++ /dev/null @@ -1,473 +0,0 @@ -""" -These are the LEGACY API integration tests. - -**Do not add further tests here. New tests should be added in v1_integration_test.** - -End-to-end API tests. Can be used to verify a live deployment is functioning as -designed. Run with the `pytest -s` command from this directory. -""" - -import json -import os -import uuid - -import pytest -import requests - -from api.constants.licenses import LICENSE_GROUPS -from api.models import Image -from api.utils.watermark import watermark - - -API_URL = os.getenv("INTEGRATION_TEST_URL", "http://localhost:8000") -known_apis = { - "http://localhost:8000": "LOCAL", - "https://api.openverse.engineering": "PRODUCTION", - "https://api-dev.openverse.engineering": "TESTING", -} - - -def setup_module(): - if API_URL in known_apis: - print(f"\n\033[1;31;40mTesting {known_apis[API_URL]} environment") - - -@pytest.fixture -def search_fixture(): - response = requests.get(f"{API_URL}/image/search?q=honey", verify=False) - assert response.status_code == 200 - parsed = json.loads(response.text) - return parsed - - -def test_search_quotes(): - """Test that a response is given even if the user messes up quote matching.""" - - response = requests.get(f'{API_URL}/image/search?q="test"', verify=False) - assert response.status_code == 200 - - -def test_search(search_fixture): - assert search_fixture["result_count"] > 0 - - -def test_search_consistency(): - """ - Ensure that no duplicates appear in the first few pages of a search query. - - Elasticsearch sometimes reaches an inconsistent state, which causes search - results to appear differently upon page refresh. This can also introduce - image duplicates in subsequent pages. - """ - - n_pages = 5 - searches = { - requests.get(f"{API_URL}/image/search?q=honey;page={page}", verify=False) - for page in range(1, n_pages) - } - - images = set() - for response in searches: - parsed = json.loads(response.text) - for result in parsed["results"]: - image_id = result["id"] - assert image_id not in images - images.add(image_id) - - -def test_image_detail(search_fixture): - test_id = search_fixture["results"][0]["id"] - response = requests.get(f"{API_URL}/image/{test_id}", verify=False) - assert response.status_code == 200 - - -def test_image_delete_invalid_creds(search_fixture): - test_id = search_fixture["results"][0]["id"] - should_fail = requests.delete( - f"{API_URL}/image/{test_id}", auth=("invalid", "credentials"), verify=False - ) - assert should_fail.status_code == 401 - - -def test_image_delete(search_fixture): - test_id = search_fixture["results"][0]["id"] - response = requests.delete( - f"{API_URL}/image/{test_id}", - auth=("continuous_integration", "deploy"), - verify=False, - ) - assert response.status_code == 204 - deleted_response = requests.get(f"{API_URL}/image/{test_id}") - assert deleted_response.status_code == 404 - - -@pytest.fixture -def link_shortener_fixture(search_fixture): - link_to_shorten = search_fixture["results"][0]["detail"] - payload = {"full_url": link_to_shorten} - response = requests.post(f"{API_URL}/link", json=payload, verify=False) - assert response.status_code == 200 - return json.loads(response.text) - - -def test_link_shortener_create(link_shortener_fixture): - assert "shortened_url" in link_shortener_fixture - - -def test_link_shortener_resolve(link_shortener_fixture): - path = link_shortener_fixture["shortened_url"].split("/")[-1] - response = requests.get( - f"{API_URL}/link/{path}", allow_redirects=False, verify=False - ) - assert response.status_code == 301 - - -def test_stats(): - response = requests.get(f"{API_URL}/statistics/image", verify=False) - parsed_response = json.loads(response.text) - assert response.status_code == 200 - num_images = 0 - provider_count = 0 - for pair in parsed_response: - image_count = pair["image_count"] - num_images += int(image_count) - provider_count += 1 - assert num_images > 0 - assert provider_count > 0 - - -@pytest.mark.skip(reason="Disabled feature") -@pytest.fixture -def test_list_create(search_fixture): - payload = { - "title": "INTEGRATION TEST", - "images": [search_fixture["results"][0]["id"]], - } - response = requests.post(API_URL + "/list", json=payload, verify=False) - parsed_response = json.loads(response.text) - assert response.status_code == 201 - return parsed_response - - -@pytest.mark.skip(reason="Disabled feature") -def test_list_detail(test_list_create): - list_slug = test_list_create["url"].split("/")[-1] - response = requests.get(f"{API_URL}/list/{list_slug}", verify=False) - assert response.status_code == 200 - - -@pytest.mark.skip(reason="Disabled feature") -def test_list_delete(test_list_create): - list_slug = test_list_create["url"].split("/")[-1] - token = test_list_create["auth"] - headers = {"Authorization": f"Token {token}"} - response = requests.delete( - f"{API_URL}/list/{list_slug}", headers=headers, verify=False - ) - assert response.status_code == 204 - - -def test_license_type_filtering(): - """Ensure that multiple license type filters interact together correctly.""" - - commercial = LICENSE_GROUPS["commercial"] - modification = LICENSE_GROUPS["modification"] - commercial_and_modification = set.intersection(modification, commercial) - response = requests.get( - f"{API_URL}/image/search?q=honey<=commercial,modification", verify=False - ) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"].upper() in commercial_and_modification - - -def test_single_license_type_filtering(): - commercial = LICENSE_GROUPS["commercial"] - response = requests.get( - f"{API_URL}/image/search?q=honey<=commercial", verify=False - ) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"].upper() in commercial - - -def test_specific_license_filter(): - response = requests.get(f"{API_URL}/image/search?q=honey&li=by", verify=False) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"] == "by" - - -def test_creator_quotation_grouping(): - """Test that quotation marks can be used to narrow down search results.""" - - no_quotes = json.loads( - requests.get( - f"{API_URL}/image/search?creator=claude%20monet", verify=False - ).text - ) - quotes = json.loads( - requests.get( - f'{API_URL}/image/search?creator="claude%20monet"', verify=False - ).text - ) - # Did quotation marks actually narrow down the search? - assert len(no_quotes["results"]) > len(quotes["results"]) - # Did we find only Claude Monet works, or did his lesser known brother Jim - # Monet sneak into the results? - for result in quotes["results"]: - assert "Claude Monet" in result["creator"] - - -@pytest.fixture -def test_oauth2_registration(): - payload = { - "name": f"INTEGRATION TEST APPLICATION {uuid.uuid4()}", - "description": "A key for testing the OAuth2 registration process.", - "email": "example@example.org", - } - response = requests.post(f"{API_URL}/oauth2/register/", json=payload, verify=False) - parsed_response = json.loads(response.text) - assert response.status_code == 201 - return parsed_response - - -def test_oauth2_token_exchange(test_oauth2_registration): - client_id = test_oauth2_registration["client_id"] - client_secret = test_oauth2_registration["client_secret"] - token_exchange_request = ( - f"client_id={client_id}" - f"&client_secret={client_secret}" - f"&grant_type=client_credentials" - ) - headers = { - "content-type": "application/x-www-form-urlencoded", - "cache-control": "no-cache", - } - response = json.loads( - requests.post( - f"{API_URL}/oauth2/token/", - data=token_exchange_request, - headers=headers, - verify=False, - ).text - ) - assert "access_token" in response - - -def test_watermark_preserves_exif(): - img_with_exif = ( - "https://raw.githubusercontent.com/ianare/exif-samples/" - "master/jpg/Canon_PowerShot_S40.jpg" - ) - info = { - "title": "test", - "creator": "test", - "license": "test", - "license_version": "test", - } - _, exif = watermark(image_url=img_with_exif, info=info) - assert exif is not None - - img_no_exif = ( - "https://creativecommons.org/wp-content/uploads/" - "2019/03/9467312978_64cd5d2f3b_z.jpg" - ) - _, no_exif = watermark(image_url=img_no_exif, info=info) - assert no_exif is None - - -def test_attribution(): - """ - Check that the API includes an attribution string. - - Since there are some works where the title or creator is not known, the format of - the attribution string can need to be tweaked slightly. - """ - - title_and_creator_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=None, - creator=None, - license="by", - license_version="3.0", - ) - print("\nAttribution examples:\n") - print(title_and_creator_missing.attribution) - assert "This work" in title_and_creator_missing.attribution - - title = "A foo walks into a bar" - creator_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=title, - creator=None, - license="by", - license_version="3.0", - ) - print(creator_missing.attribution) - assert title in creator_missing.attribution - assert "by " not in creator_missing.attribution - - creator = "John Doe" - title_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=None, - creator=creator, - license="by", - license_version="3.0", - ) - print(title_missing.attribution) - assert creator in title_missing.attribution - assert "This work" in title_missing.attribution - - all_data_present = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=title, - creator=creator, - license="by", - license_version="3.0", - ) - print(all_data_present.attribution) - assert title in all_data_present.attribution - assert creator in all_data_present.attribution - - -def test_browse_by_provider(): - response = requests.get(f"{API_URL}/image/browse/behance", verify=False) - assert response.status_code == 200 - parsed = json.loads(response.text) - assert parsed["result_count"] > 0 - - -def test_extension_filter(): - response = requests.get(f"{API_URL}/image/search?q=honey&extension=jpg") - parsed = json.loads(response.text) - for result in parsed["results"]: - assert ".jpg" in result["url"] - - -@pytest.fixture -def search_factory(): - """Allow passing url parameters along with a search request.""" - - def _parameterized_search(**kwargs): - response = requests.get(f"{API_URL}/image/search", params=kwargs, verify=False) - assert response.status_code == 200 - parsed = response.json() - return parsed - - return _parameterized_search - - -@pytest.fixture -def search_with_dead_links(search_factory): - """Test with ``filter_dead`` parameter set to ``False``.""" - - def _search_with_dead_links(**kwargs): - return search_factory(filter_dead=False, **kwargs) - - return _search_with_dead_links - - -@pytest.fixture -def search_without_dead_links(search_factory): - """Test with ``filter_dead`` parameter set to ``True``.""" - - def _search_without_dead_links(**kwargs): - return search_factory(filter_dead=True, **kwargs) - - return _search_without_dead_links - - -def test_page_size_removing_dead_links(search_without_dead_links): - """ - Test whether the number of results returned is equal to the requested page size. - - We have about 500 dead links in the sample data and should have around - 8 dead links in the first 100 results on a query composed of a single - wildcard operator. - - """ - data = search_without_dead_links(q="*", pagesize=100) - assert len(data["results"]) == 100 - - -def test_dead_links_are_correctly_filtered( - search_with_dead_links, search_without_dead_links -): - """ - Test the results for the same query with and without dead links are different. - - We use the results' id to compare them. - """ - data_with_dead_links = search_with_dead_links(q="*", pagesize=100) - data_without_dead_links = search_without_dead_links(q="*", pagesize=100) - - comparisons = [] - for result_1 in data_with_dead_links["results"]: - for result_2 in data_without_dead_links["results"]: - comparisons.append(result_1["id"] == result_2["id"]) - - # Some results should be different - # so we should have less than 100 True comparisons - assert comparisons.count(True) < 100 - - -def test_page_consistency_removing_dead_links(search_without_dead_links): - """Test that results in consecutive pages don't repeat when filtering dead links.""" - - total_pages = 100 - pagesize = 5 - - page_results = [] - for page in range(1, total_pages + 1): - page_data = search_without_dead_links(q="*", pagesize=pagesize, page=page) - page_results += page_data["results"] - - def no_duplicates(xs): - s = set() - for x in xs: - if x in s: - return False - s.add(x) - return True - - ids = list(map(lambda x: x["id"], page_results)) - # No results should be repeated so we should have no duplicate ids - assert no_duplicates(ids) - - -def test_related_does_not_break(): - response = requests.get( - f"{API_URL}/image/related/000000000000000000000000000000000000", verify=False - ) - assert response.status_code == 404 - - -@pytest.fixture -def related_factory(): - """Allow passing url parameters along with a related images request.""" - - def _parameterized_search(identifier, **kwargs): - response = requests.get( - f"{API_URL}/image/related/{identifier}", params=kwargs, verify=False - ) - assert response.status_code == 200 - parsed = response.json() - return parsed - - return _parameterized_search - - -@pytest.mark.skip( - reason="Generally, we don't paginate related images, so " - "consistency is less of an issue." -) -def test_related_image_search_page_consistency( - related_factory, search_without_dead_links -): - initial_images = search_without_dead_links(q="*", pagesize=10) - for image in initial_images["results"]: - related = related_factory(image["id"]) - assert related["result_count"] > 0 - assert len(related["results"]) == 10 diff --git a/api/test/api_live_search_qa.py b/api/test/api_live_search_qa.py deleted file mode 100644 index 550d341c782..00000000000 --- a/api/test/api_live_search_qa.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -Tests to run against a live Openverse instance with a significant (10M+) record count. - -Quality of search rankings can be affected by the number of documents in the search -index, so toy examples with few documents do not accurately model relevance at scale. -""" - -import json - -import requests - - -API_URL = "https://api-dev.openverse.engineering" - - -def _phrase_in_tags(tags, term): - for tag in tags: - if "name" in tag: - if tag["name"] == term: - return True - return False - - -def _phrase_in_title(title, term): - return term in title - - -def test_phrase_relevance(): - """ - Test that results have the phrase in the tags or title. - - If I search for "home office", the top results ought to have the phrase - 'home office' in the tags or title. - """ - - search_term = "home office" - response = requests.get(f"{API_URL}/image/search?q={search_term}", verify=False) - assert response.status_code == 200 - parsed = json.loads(response.text) - first_result = parsed["results"][0] - assert _phrase_in_tags(first_result["tags"], search_term) or _phrase_in_title( - first_result["title"], search_term - ) diff --git a/api/test/conftest.py b/api/test/conftest.py index 8ffefc50e7a..1aecc9b86c7 100644 --- a/api/test/conftest.py +++ b/api/test/conftest.py @@ -1,3 +1,5 @@ +"""Fixtures usable by or necessary for both unit and integration tests.""" + from test.fixtures.asynchronous import ensure_asgi_lifecycle, get_new_loop, session_loop from test.fixtures.cache import ( django_cache, @@ -5,6 +7,7 @@ unreachable_django_cache, unreachable_redis, ) +from test.fixtures.rest_framework import api_client, request_factory __all__ = [ @@ -15,4 +18,6 @@ "redis", "unreachable_django_cache", "unreachable_redis", + "api_client", + "request_factory", ] diff --git a/api/test/fixtures/rest_framework.py b/api/test/fixtures/rest_framework.py new file mode 100644 index 00000000000..3359b0a81df --- /dev/null +++ b/api/test/fixtures/rest_framework.py @@ -0,0 +1,15 @@ +from rest_framework.test import APIClient, APIRequestFactory + +import pytest + + +@pytest.fixture +def api_client(): + return APIClient() + + +@pytest.fixture +def request_factory() -> APIRequestFactory(): + request_factory = APIRequestFactory(defaults={"REMOTE_ADDR": "192.0.2.1"}) + + return request_factory diff --git a/api/test/integration/conftest.py b/api/test/integration/conftest.py new file mode 100644 index 00000000000..bd9e998175c --- /dev/null +++ b/api/test/integration/conftest.py @@ -0,0 +1,13 @@ +import pytest + + +@pytest.fixture +def django_db_setup(): + """ + We want the integration tests to use the real database so that we can test + the complete behaviour of the system. This fixture overrides the fixture + from ``pytest-django`` that sets up the tests database and because it's a + no-op, the tests will use the real database. + """ + + pass diff --git a/api/test/integration/test_audio_integration.py b/api/test/integration/test_audio_integration.py new file mode 100644 index 00000000000..32e4bfb1e70 --- /dev/null +++ b/api/test/integration/test_audio_integration.py @@ -0,0 +1,29 @@ +""" +End-to-end API tests for audio. + +Can be used to verify a live deployment is functioning as designed. +Run with the `pytest -s` command from this directory, inside the Docker +container. + +Tests common to all media types are in ``test_media_integration.py``. +""" + +import pytest + + +pytestmark = pytest.mark.django_db + + +def test_audio_detail_without_thumb(api_client): + resp = api_client.get("/v1/audio/44540200-91eb-483d-9e99-38ce86a52fb6/") + assert resp.status_code == 200 + parsed = resp.json() + assert parsed["thumbnail"] is None + + +def test_audio_search_without_thumb(api_client): + """The first audio of this search should not have a thumbnail.""" + resp = api_client.get("/v1/audio/?q=zaus") + assert resp.status_code == 200 + parsed = resp.json() + assert parsed["results"][0]["thumbnail"] is None diff --git a/api/test/test_auth.py b/api/test/integration/test_auth.py similarity index 72% rename from api/test/test_auth.py rename to api/test/integration/test_auth.py index 752c848fed4..b6a7b492194 100644 --- a/api/test/test_auth.py +++ b/api/test/integration/test_auth.py @@ -1,9 +1,7 @@ import time import uuid -from unittest.mock import patch from django.urls import reverse -from django.utils.http import urlencode import pytest from oauth2_provider.models import AccessToken @@ -38,13 +36,13 @@ def unreachable_oauth_cache(unreachable_django_cache, monkeypatch): @pytest.mark.django_db @pytest.fixture -def test_auth_tokens_registration(client): +def test_auth_tokens_registration(api_client): data = { "name": f"INTEGRATION TEST APPLICATION {uuid.uuid4()}", "description": "A key for testing the OAuth2 registration process.", "email": "example@example.org", } - res = client.post( + res = api_client.post( "/v1/auth_tokens/register/", data, verify=False, @@ -56,20 +54,19 @@ def test_auth_tokens_registration(client): @pytest.mark.django_db @pytest.fixture -def test_auth_token_exchange(client, test_auth_tokens_registration): - client_id = test_auth_tokens_registration["client_id"] - client_secret = test_auth_tokens_registration["client_secret"] - data = urlencode( - { - "client_id": client_id, - "client_secret": client_secret, - "grant_type": "client_credentials", - } - ) - res = client.post( +def test_auth_token_exchange(api_client, test_auth_tokens_registration): + api_client_id = test_auth_tokens_registration["client_id"] + api_client_secret = test_auth_tokens_registration["client_secret"] + data = { + "client_id": api_client_id, + "client_secret": api_client_secret, + "grant_type": "client_credentials", + } + + res = api_client.post( "/v1/auth_tokens/token/", data, - "application/x-www-form-urlencoded", + "multipart", verify=False, ) res_data = res.json() @@ -78,8 +75,8 @@ def test_auth_token_exchange(client, test_auth_tokens_registration): @pytest.mark.django_db -def test_auth_token_exchange_unsupported_method(client): - res = client.get( +def test_auth_token_exchange_unsupported_method(api_client): + res = api_client.get( "/v1/auth_tokens/token/", verify=False, ) @@ -87,11 +84,11 @@ def test_auth_token_exchange_unsupported_method(client): assert res.json()["detail"] == 'Method "GET" not allowed.' -def _integration_verify_most_recent_token(client): +def _integration_verify_most_recent_token(api_client): verify = OAuth2Verification.objects.last() code = verify.code path = reverse("verify-email", args=[code]) - return client.get(path) + return api_client.get(path) @pytest.mark.django_db @@ -110,17 +107,17 @@ def _integration_verify_most_recent_token(client): ) def test_auth_email_verification( request, - client, + api_client, is_cache_reachable, cache_name, rate_limit_model, test_auth_token_exchange, ): - res = _integration_verify_most_recent_token(client) + res = _integration_verify_most_recent_token(api_client) assert res.status_code == 200 test_auth_rate_limit_reporting( request, - client, + api_client, is_cache_reachable, cache_name, rate_limit_model, @@ -137,7 +134,7 @@ def test_auth_email_verification( @cache_availability_params def test_auth_rate_limit_reporting( request, - client, + api_client, is_cache_reachable, cache_name, rate_limit_model, @@ -153,7 +150,7 @@ def test_auth_rate_limit_reporting( application = AccessToken.objects.get(token=token).application application.rate_limit_model = rate_limit_model application.save() - res = client.get("/v1/rate_limit/", HTTP_AUTHORIZATION=f"Bearer {token}") + res = api_client.get("/v1/rate_limit/", HTTP_AUTHORIZATION=f"Bearer {token}") res_data = res.json() if is_cache_reachable: assert res.status_code == 200 @@ -176,14 +173,14 @@ def test_auth_rate_limit_reporting( (True, False), ) def test_auth_response_headers( - client, verified, test_auth_tokens_registration, test_auth_token_exchange + api_client, verified, test_auth_tokens_registration, test_auth_token_exchange ): if verified: - _integration_verify_most_recent_token(client) + _integration_verify_most_recent_token(api_client) token = test_auth_token_exchange["access_token"] - res = client.get("/v1/images/", HTTP_AUTHORIZATION=f"Bearer {token}") + res = api_client.get("/v1/images/", HTTP_AUTHORIZATION=f"Bearer {token}") assert ( res.headers["x-ov-client-application-name"] @@ -192,8 +189,8 @@ def test_auth_response_headers( assert res.headers["x-ov-client-application-verified"] == str(verified) -def test_unauthed_response_headers(client): - res = client.get("/v1/images") +def test_unauthed_response_headers(api_client): + res = api_client.get("/v1/images") assert "x-ov-client-application-name" not in res.headers assert "x-ov-client-application-verified" not in res.headers @@ -207,21 +204,16 @@ def test_unauthed_response_headers(client): ("asc", "2022-01-01"), ], ) -def test_sorting_authed(client, test_auth_token_exchange, sort_dir, exp_indexed_on): +def test_sorting_authed(api_client, test_auth_token_exchange, sort_dir, exp_indexed_on): time.sleep(1) token = test_auth_token_exchange["access_token"] query_params = { "unstable__sort_by": "indexed_on", "unstable__sort_dir": sort_dir, } - with patch( - "api.views.image_views.ImageViewSet.get_db_results" - ) as mock_get_db_result: - mock_get_db_result.side_effect = lambda value: value - - res = client.get( - "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" - ) + res = api_client.get( + "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" + ) assert res.status_code == 200 res_data = res.json() @@ -238,7 +230,7 @@ def test_sorting_authed(client, test_auth_token_exchange, sort_dir, exp_indexed_ ], ) def test_authority_authed( - client, test_auth_token_exchange, authority_boost, exp_source + api_client, test_auth_token_exchange, authority_boost, exp_source ): time.sleep(1) token = test_auth_token_exchange["access_token"] @@ -247,14 +239,9 @@ def test_authority_authed( "unstable__authority": "true", "unstable__authority_boost": authority_boost, } - with patch( - "api.views.image_views.ImageViewSet.get_db_results" - ) as mock_get_db_result: - mock_get_db_result.side_effect = lambda value: value - - res = client.get( - "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" - ) + res = api_client.get( + "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" + ) assert res.status_code == 200 res_data = res.json() @@ -263,23 +250,27 @@ def test_authority_authed( @pytest.mark.django_db -def test_page_size_limit_unauthed(client): +def test_page_size_limit_unauthed(api_client): query_params = {"page_size": 20} - res = client.get("/v1/images/", query_params) + res = api_client.get("/v1/images/", query_params) assert res.status_code == 200 query_params["page_size"] = 21 - res = client.get("/v1/images/", query_params) + res = api_client.get("/v1/images/", query_params) assert res.status_code == 401 @pytest.mark.django_db -def test_page_size_limit_authed(client, test_auth_token_exchange): +def test_page_size_limit_authed(api_client, test_auth_token_exchange): time.sleep(1) token = test_auth_token_exchange["access_token"] query_params = {"page_size": 21} - res = client.get("/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}") + res = api_client.get( + "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" + ) assert res.status_code == 200 query_params = {"page_size": 500} - res = client.get("/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}") + res = api_client.get( + "/v1/images/", query_params, HTTP_AUTHORIZATION=f"Bearer {token}" + ) assert res.status_code == 200 diff --git a/api/test/test_dead_link_filter.py b/api/test/integration/test_dead_link_filter.py similarity index 91% rename from api/test/test_dead_link_filter.py rename to api/test/integration/test_dead_link_filter.py index caa4600fb7a..6fe7de4cca4 100644 --- a/api/test/test_dead_link_filter.py +++ b/api/test/integration/test_dead_link_filter.py @@ -4,10 +4,8 @@ from django.conf import settings import pytest -import requests from api.controllers.elasticsearch.helpers import DEAD_LINK_RATIO -from test.constants import API_URL @pytest.fixture @@ -62,7 +60,7 @@ def _make_head_requests(urls): @pytest.mark.django_db @_patch_make_head_requests() -def test_dead_link_filtering(mocked_map, client): +def test_dead_link_filtering(mocked_map, api_client): path = "/v1/images/" query_params = {"q": "*", "page_size": 20} @@ -71,7 +69,7 @@ def test_dead_link_filtering(mocked_map, client): "api.views.image_views.ImageViewSet.get_db_results" ) as mock_get_db_result: mock_get_db_result.side_effect = lambda value: value - res_with_dead_links = client.get( + res_with_dead_links = api_client.get( path, query_params | {"filter_dead": False}, ) @@ -79,7 +77,7 @@ def test_dead_link_filtering(mocked_map, client): mocked_map.assert_not_called() # Make a request that filters dead links... - res_without_dead_links = client.get( + res_without_dead_links = api_client.get( path, query_params | {"filter_dead": True}, ) @@ -111,7 +109,7 @@ def test_dead_link_filtering(mocked_map, client): ), ) def test_dead_link_filtering_all_dead_links( - client, + api_client, filter_dead, page_size, expected_result_count, @@ -126,7 +124,7 @@ def test_dead_link_filtering_all_dead_links( ) as mock_get_db_result: mock_get_db_result.side_effect = lambda value: value with patch_link_validation_dead_for_count(page_size / DEAD_LINK_RATIO): - response = client.get( + response = api_client.get( path, query_params | {"filter_dead": filter_dead}, ) @@ -141,11 +139,11 @@ def test_dead_link_filtering_all_dead_links( @pytest.fixture -def search_factory(client): +def search_factory(api_client): """Allow passing url parameters along with a search request.""" def _parameterized_search(**kwargs): - response = requests.get(f"{API_URL}/v1/images", params=kwargs, verify=False) + response = api_client.get("/v1/images/", kwargs) assert response.status_code == 200 parsed = response.json() return parsed @@ -203,10 +201,8 @@ def no_duplicates(xs): @pytest.mark.django_db -def test_max_page_count(): - response = requests.get( - f"{API_URL}/v1/images", - params={"page": settings.MAX_PAGINATION_DEPTH + 1}, - verify=False, +def test_max_page_count(api_client): + response = api_client.get( + "/v1/images/", {"page": settings.MAX_PAGINATION_DEPTH + 1} ) assert response.status_code == 400 diff --git a/api/test/integration/test_deprecations.py b/api/test/integration/test_deprecations.py new file mode 100644 index 00000000000..a6b3d30bebf --- /dev/null +++ b/api/test/integration/test_deprecations.py @@ -0,0 +1,38 @@ +import uuid + +import pytest + + +@pytest.mark.parametrize( + "old, new", + [ + ("/v1/sources?type=images", "/v1/images/stats/"), + ("/v1/recommendations/images/{idx}", "/v1/images/{idx}/related/"), + ("/v1/oembed?key=value", "/v1/images/oembed/?key=value"), + ("/v1/thumbs/{idx}", "/v1/images/{idx}/thumb/"), + ], +) +def test_deprecated_endpoints_redirect_to_new(old, new, api_client): + idx = uuid.uuid4() + old = old.format(idx=str(idx)) + new = new.format(idx=str(idx)) + + res = api_client.get(old) + assert res.status_code == 301 + assert res.headers.get("Location") == new + + +@pytest.mark.parametrize( + "method, path, kwargs", + [ + ("get", "/v1/link/abc", {}), + ( + "post", + "/v1/link/", + {"data": {"full_url": "abcd"}, "content_type": "application/json"}, + ), + ], +) +def test_deleted_endpoints_are_gone(method, path, kwargs, api_client): + res = getattr(api_client, method)(path, **kwargs) + assert res.status_code == 410 diff --git a/api/test/integration/test_image_integration.py b/api/test/integration/test_image_integration.py new file mode 100644 index 00000000000..7ee3ac03cbe --- /dev/null +++ b/api/test/integration/test_image_integration.py @@ -0,0 +1,73 @@ +""" +End-to-end API tests for images. + +Can be used to verify a live deployment is functioning as designed. +Run with the `pytest -s` command from this directory, inside the Docker +container. + +Tests common to all media types are in ``test_media_integration.py``. +""" + +import pytest + + +pytestmark = pytest.mark.django_db + + +@pytest.fixture +def image_fixture(api_client): + response = api_client.get("/v1/images/", {"q": "dog"}) + assert response.status_code == 200 + parsed = response.json() + return parsed + + +@pytest.mark.parametrize( + "url, expected_status_code", + [ + pytest.param( + "https://any.domain/any/path/{identifier}", + 200, + id="OK; no trailing slash", + ), + pytest.param( + "https://any.domain/any/path/{identifier}/", + 200, + id="OK; trailing slash", + ), # trailing slash + pytest.param( + "https://any.domain/any/path/00000000-0000-0000-0000-000000000000", + 404, + id="not OK; valid UUID but no matching identifier", + ), + pytest.param( + "https://any.domain/any/path/not-a-valid-uuid", + 400, + id="not OK; invalid UUID", + ), + ], +) +def test_oembed_endpoint( + image_fixture, url: str, expected_status_code: int, api_client +): + if "{identifier}" in url: + url = url.format(identifier=image_fixture["results"][0]["id"]) + params = {"url": url} + response = api_client.get("/v1/images/oembed/", params) + assert response.status_code == expected_status_code + + +def test_oembed_endpoint_for_json(image_fixture, api_client): + identifier = image_fixture["results"][0]["id"] + params = { + "url": f"https://any.domain/any/path/{identifier}", + # 'format': 'json' is the default + } + response = api_client.get("/v1/images/oembed/", params) + assert response.status_code == 200 + assert response.headers["Content-Type"] == "application/json" + + parsed = response.json() + assert parsed["width"] == image_fixture["results"][0]["width"] + assert parsed["height"] == image_fixture["results"][0]["height"] + assert parsed["license_url"] == image_fixture["results"][0]["license_url"] diff --git a/api/test/integration/test_media_integration.py b/api/test/integration/test_media_integration.py new file mode 100644 index 00000000000..ee725c2f70a --- /dev/null +++ b/api/test/integration/test_media_integration.py @@ -0,0 +1,447 @@ +"""This test suite covers common operations for all media types.""" + +import re +from dataclasses import dataclass + +import pytest + +from api.constants.licenses import LICENSE_GROUPS + + +pytestmark = pytest.mark.django_db + + +@dataclass +class MediaType: + name: str # the name of the media type + path: str # the version of the media type in the URL paths + providers: list[str] # providers for the media type from the sample data + categories: list[str] # categories for the media type from the sample data + tags: list[str] # tags for the media type from the sample data + q: str # a search query for this media type that yields some results + + +def _check_non_es_fields_are_present(results: list[dict]): + for result in results: + # ``license`` is stored in ES, ``license_version`` is not. + assert result["license_version"] is not None + # ``creator`` is stored in ES, ``creator_url`` is not. + assert result["creator_url"] is not None + # ``foreign_landing_url`` is not stored in ES. + assert result["foreign_landing_url"] is not None + + +############ +# Fixtures # +############ + + +@pytest.fixture(params=["audio", "image"]) +def media_type(request): + """ + Get a ``MediaType`` object associated with each media type supported by + Openverse. This fixture is used to parametrize tests and other dependent + fixtures so that the overall test suite covers all supported media types. + """ + + name = request.param + return { + "audio": MediaType( + name="audio", + path="audio", + providers=["freesound", "jamendo", "wikimedia_audio"], + categories=["music", "pronunciation"], + tags=["cat"], + q="love", + ), + "image": MediaType( + name="image", + path="images", + providers=["flickr", "stocksnap"], + categories=["photograph"], + tags=["cat", "Cat"], + q="dog", + ), + }[name] + + +@pytest.fixture +def search_results(media_type: MediaType, api_client) -> tuple[MediaType, dict]: + res = api_client.get(f"/v1/{media_type.path}/", {"q": media_type.q}) + assert res.status_code == 200 + + data = res.json() + return media_type, data + + +@pytest.fixture +def single_result(search_results) -> tuple[MediaType, dict]: + media_type, data = search_results + item = data["results"][0] + return media_type, item + + +@pytest.fixture +def related_results(single_result, api_client) -> tuple[MediaType, dict, dict]: + media_type, item = single_result + res = api_client.get(f"/v1/{media_type.path}/{item['id']}/related/") + assert res.status_code == 200 + + data = res.json() + return media_type, item, data + + +@pytest.fixture +def sensitive_result(media_type: MediaType, api_client) -> tuple[MediaType, dict]: + q = "bird" # Not using the default ``q`` from ``media_type``. + res = api_client.get( + f"/v1/{media_type.path}/", + {"q": q, "unstable__include_sensitive_results": True}, + ) + assert res.status_code == 200 + + data = res.json() + # Raises ``StopIteration`` if no sensitive results are found. + sensitive_result = next( + result for result in data["results"] if result["unstable__sensitivity"] + ) + + return media_type, sensitive_result + + +############## +# Stats view # +############## + + +def test_stats(media_type: MediaType, api_client): + res = api_client.get(f"/v1/{media_type.path}/stats/") + data = res.json() + num_media = 0 + provider_count = 0 + for pair in data: + num_media += pair["media_count"] + provider_count += 1 + assert num_media > 0 + assert provider_count > 0 + + +############### +# Search view # +############### + + +def test_search_returns_non_zero_results(search_results): + _, data = search_results + assert data["result_count"] > 0 + + +def test_search_handles_unbalanced_quotes_with_ok(media_type: MediaType, api_client): + res = api_client.get(f"/v1/{media_type.path}/", {"q": f'"{media_type.q}'}) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + + +def test_search_handles_special_chars_with_ok(media_type: MediaType, api_client): + res = api_client.get(f"/v1/{media_type.path}/", {"q": f"{media_type.q}!"}) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + + +def test_search_results_have_non_es_fields(search_results): + _, data = search_results + _check_non_es_fields_are_present(data["results"]) + + +def test_search_removes_dupes_from_initial_pages(media_type: MediaType, api_client): + """ + Return consistent, non-duplicate results in the first n pages. + + Elasticsearch sometimes reaches an inconsistent state, which causes search + results to appear differently upon page refresh. This can also introduce + image duplicates in subsequent pages. This test ensures that no duplicates + appear in the first few pages of a search query. + """ + + num_pages = 5 + + searches = { + api_client.get(f"/v1/{media_type.path}/", {"page": page}) + for page in range(1, num_pages) + } + + results = set() + for res in searches: + parsed = res.json() + for result in parsed["results"]: + media_id = result["id"] + assert media_id not in results # Ensure that each result is new. + results.add(media_id) + + +@pytest.mark.parametrize( + "search_field, match_field", [("q", "title"), ("creator", "creator")] +) +def test_search_quotes_matches_only_exact( + media_type: MediaType, search_field, match_field, api_client +): + # We want a query containing more than one word. + if match_field == "title": + q = "dancing penguins" + else: + q = "The League" if media_type.name == "audio" else "Steve Wedgwood" + + base_params = {"unstable__include_sensitive_results": True} + path = f"/v1/{media_type.path}/" + + unquoted_res = api_client.get(path, base_params | {search_field: q}) + assert unquoted_res.status_code == 200 + + unquoted_data = unquoted_res.json() + unquoted_result_count = unquoted_data["result_count"] + assert unquoted_result_count > 0 + + unquoted_results = unquoted_data["results"] + exact_matches = [q in item[match_field] for item in unquoted_results].count(True) + assert 0 < exact_matches < unquoted_result_count + + quoted_res = api_client.get(path, base_params | {search_field: f'"{q}"'}) + assert quoted_res.status_code == 200 + + quoted_data = quoted_res.json() + quoted_result_count = quoted_data["result_count"] + assert quoted_result_count > 0 + + quoted_results = quoted_data["results"] + assert all([q in item[match_field] for item in quoted_results]) + + # Unquoted results will match more records due to the query being overall + # less strict. Above we check that the results are not 0 to confirm that we + # do still get results back. + assert quoted_result_count < unquoted_result_count + + +def test_search_filters_by_source(media_type: MediaType, api_client): + provider = media_type.providers[0] + res = api_client.get( + f"/v1/{media_type.path}/", + {"q": media_type.q, "source": provider}, + ) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + assert all(result["source"] == provider for result in data["results"]) + + +def test_search_returns_zero_results_when_all_excluded( + media_type: MediaType, api_client +): + res = api_client.get( + f"/v1/{media_type.path}/", + {"q": media_type.q, "excluded_source": ",".join(media_type.providers)}, + ) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] == 0 + + +def test_search_refuses_both_sources_and_excluded(media_type: MediaType, api_client): + res = api_client.get( + f"/v1/{media_type.path}/", + {"q": media_type.q, "source": "x", "excluded_source": "y"}, + ) + assert res.status_code == 400 + + +@pytest.mark.parametrize( + "filter_rule, exp_licenses", + [ + ({"license_type": "commercial"}, LICENSE_GROUPS["commercial"]), # license group + ( + {"license_type": "commercial,modification"}, + LICENSE_GROUPS["commercial"] & LICENSE_GROUPS["modification"], + ), # multiple license groups + ({"license": "by"}, ["by"]), # exact license + ({"license": "by,by-nc-nd"}, ["by", "by-nc-nd"]), # multiple exact licenses + ({"license": "bY"}, ["by"]), # case insensitive + ], +) +def test_search_filters_by_license( + media_type: MediaType, filter_rule, exp_licenses, api_client +): + res = api_client.get(f"/v1/{media_type.path}/", filter_rule) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + assert all(result["license"] in exp_licenses for result in data["results"]) + + +def test_search_filters_by_extension(media_type: MediaType, api_client): + ext = "mp3" if media_type.name == "audio" else "jpg" + res = api_client.get(f"/v1/{media_type.path}/", {"extension": ext}) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + assert all(result["filetype"] == ext for result in data["results"]) + + +def test_search_filters_by_category(media_type: MediaType, api_client): + for category in media_type.categories: + res = api_client.get(f"/v1/{media_type.path}/", {"category": category}) + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + assert all(result["category"] == category for result in data["results"]) + + +def test_search_refuses_invalid_categories(media_type: MediaType, api_client): + res = api_client.get(f"/v1/{media_type.path}/", {"category": "invalid_category"}) + assert res.status_code == 400 + + +################ +# Detail view # +################ + + +@pytest.mark.parametrize( + "bad_uuid", + [ + "123456789123456789123456789123456789", + "12345678-1234-5678-1234-1234567891234", + "abcd", + ], +) +def test_detail_view_for_invalid_uuids_returns_not_found( + media_type: MediaType, bad_uuid: str, api_client +): + res = api_client.get(f"/v1/{media_type.path}/{bad_uuid}/") + assert res.status_code == 404 + + +def test_detail_view_returns_ok(single_result, api_client): + media_type, item = single_result + res = api_client.get(f"/v1/{media_type.path}/{item['id']}/") + assert res.status_code == 200 + + +def test_detail_view_contains_sensitivity_info(sensitive_result, api_client): + media_type, item = sensitive_result + res = api_client.get(f"/v1/{media_type.path}/{item['id']}/") + assert res.status_code == 200 + + data = res.json() + assert data["unstable__sensitivity"] is not None + assert len(data["unstable__sensitivity"]) > 0 + + +################ +# Related view # +################ + + +def test_related_view_has_no_pagination(related_results): + _, _, data = related_results + results = data["results"] + assert data["result_count"] == len(results) == 10 + assert data["page_count"] == 1 + + +def test_related_results_have_something_in_common_with_parent(related_results): + _, item, data = related_results + + def _get_terms_set(obj): + # The title is analyzed in ES, we try to mimic it here. + terms = [t["name"] for t in obj["tags"]] + re.split(r"[\s-]", obj["title"]) + return {t.lower() for t in terms} + + terms_set = _get_terms_set(item) + # Make sure each result has at least one word in common with the original item, + # or is by the same creator. + for result in data["results"]: + assert ( + len(terms_set.intersection(_get_terms_set(result))) > 0 + or result["creator"] == item["creator"] + ), f"{terms_set} {_get_terms_set(result)}/{result['creator']}-{item['creator']}" + + +def test_related_results_have_non_es_fields(related_results): + *_, data = related_results + _check_non_es_fields_are_present(data["results"]) + + +############### +# Report view # +############### + + +def test_report_is_created(single_result, api_client): + media_type, item = single_result + res = api_client.post( + f"/v1/{media_type.path}/{item['id']}/report/", + { + "reason": "mature", + "description": "This item contains sensitive content", + }, + "json", + ) + assert res.status_code == 201 + + data = res.json() + assert data["identifier"] == item["id"] + + +#################### +# Collection views # +#################### + + +def test_collection_by_tag(media_type: MediaType, api_client): + tags = media_type.tags + for tag in tags: + res = api_client.get(f"/v1/{media_type.path}/tag/{tag}/") + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + for result in data["results"]: + tag_names = [tag["name"] for tag in result["tags"]] + assert tag in tag_names + + +def test_collection_by_source(media_type: MediaType, api_client): + source = api_client.get(f"/v1/{media_type.path}/stats/").json()[0]["source_name"] + + res = api_client.get(f"/v1/{media_type.path}/source/{source}/") + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + assert all(result["source"] == source for result in data["results"]) + + +def test_collection_by_creator(media_type: MediaType, api_client): + source_res = api_client.get(f"/v1/{media_type.path}/stats/") + source = source_res.json()[0]["source_name"] + + first_res = api_client.get(f"/v1/{media_type.path}/source/{source}/") + first = first_res.json()["results"][0] + assert (creator := first.get("creator")) + + res = api_client.get(f"/v1/{media_type.path}/source/{source}/creator/{creator}/") + assert res.status_code == 200 + + data = res.json() + assert data["result_count"] > 0 + for result in data["results"]: + assert result["source"] == source + assert result["creator"] == creator diff --git a/api/test/media_integration.py b/api/test/media_integration.py deleted file mode 100644 index 2ca5eb699c9..00000000000 --- a/api/test/media_integration.py +++ /dev/null @@ -1,258 +0,0 @@ -""" -Base test cases for all media types. - -These are not tests and cannot be invoked. -""" - -import json -import re - -import requests - -from test.constants import API_URL - - -def search(fixture): - """Return results for test query.""" - - assert fixture["result_count"] > 0 - - -def search_by_category(media_path, category, fixture): - response = requests.get(f"{API_URL}/v1/{media_path}?category={category}") - assert response.status_code == 200 - data = json.loads(response.text) - assert data["result_count"] < fixture["result_count"] - results = data["results"] - # Make sure each result is from the specified category - assert all(audio_item["category"] == category for audio_item in results) - - -def tag_collection(media_path, tag="cat"): - response = requests.get(f"{API_URL}/v1/{media_path}/tag/{tag}") - assert response.status_code == 200 - - results = response.json()["results"] - for r in results: - tag_names = [tag["name"] for tag in r["tags"]] - assert tag in tag_names - - -def source_collection(media_path): - source = requests.get(f"{API_URL}/v1/{media_path}/stats").json()[0]["source_name"] - - response = requests.get(f"{API_URL}/v1/{media_path}/source/{source}") - assert response.status_code == 200 - - results = response.json()["results"] - assert all(result["source"] == source for result in results) - - -def creator_collection(media_path): - source = requests.get(f"{API_URL}/v1/{media_path}/stats").json()[0]["source_name"] - - first_res = requests.get(f"{API_URL}/v1/{media_path}/source/{source}").json()[ - "results" - ][0] - if not (creator := first_res.get("creator")): - raise AttributeError(f"No creator in {first_res}") - - response = requests.get( - f"{API_URL}/v1/{media_path}/source/{source}/creator/{creator}" - ) - assert response.status_code == 200 - - results = response.json()["results"] - for result in results: - assert result["source"] == source, f"{result['source']} != {source}" - assert result["creator"] == creator, f"{result['creator']} != {creator}" - - -def search_all_excluded(media_path, excluded_source): - response = requests.get( - f"{API_URL}/v1/{media_path}?q=test&excluded_source={','.join(excluded_source)}" - ) - data = json.loads(response.text) - assert data["result_count"] == 0 - - -def search_source_and_excluded(media_path): - response = requests.get( - f"{API_URL}/v1/{media_path}?q=test&source=x&excluded_source=y" - ) - assert response.status_code == 400 - - -def search_quotes(media_path, q="test"): - """Return a response when quote matching is messed up.""" - - response = requests.get(f'{API_URL}/v1/{media_path}?q="{q}', verify=False) - assert response.status_code == 200 - - -def search_quotes_exact(media_path, q): - """Return only exact matches for the given query.""" - - url_format = ( - f"{API_URL}/v1/{media_path}?q={{q}}&unstable__include_sensitive_results=true" - ) - unquoted_response = requests.get(url_format.format(q=q), verify=False) - assert unquoted_response.status_code == 200 - unquoted_result_count = unquoted_response.json()["result_count"] - assert unquoted_result_count > 0 - unquoted_results = unquoted_response.json()["results"] - titles = [res["title"] for res in unquoted_results] - exact_match_count = sum([1 for t in titles if q in t]) - assert exact_match_count > 0, f"No results contain `{q}` in title: {titles}" - assert exact_match_count < len( - titles - ), f"Unquoted search returned only exact matches: {titles}" - - quoted_response = requests.get(url_format.format(q=f'"{q}"'), verify=False) - assert quoted_response.status_code == 200 - quoted_result_count = quoted_response.json()["result_count"] - assert quoted_result_count > 0 - - # The rationale here is that the unquoted results will match more records due - # to the query being overall less strict. Quoting the query will make it more - # strict causing it to return fewer results. - # Above we check that the results are not 0 to confirm that we do still get results back. - assert quoted_result_count < unquoted_result_count - - quoted_result_titles = [res["title"] for res in quoted_response.json()["results"]] - assert all( - [q in title for title in quoted_result_titles] - ), f"Not all titles contain exact match for `{q}`: {quoted_result_titles}" - - -def search_special_chars(media_path, q="test"): - """Return a response when query includes special characters.""" - - response = requests.get(f"{API_URL}/v1/{media_path}?q={q}!", verify=False) - assert response.status_code == 200 - - -def search_consistency( - media_path, - n_pages, -): - """ - Return consistent, non-duplicate results in the first n pages. - - Elasticsearch sometimes reaches an inconsistent state, which causes search - results to appear differently upon page refresh. This can also introduce - image duplicates in subsequent pages. This test ensures that no duplicates - appear in the first few pages of a search query. - """ - - searches = { - requests.get(f"{API_URL}/v1/{media_path}?page={page}", verify=False) - for page in range(1, n_pages) - } - - results = set() - for response in searches: - parsed = json.loads(response.text) - for result in parsed["results"]: - media_id = result["id"] - assert media_id not in results - results.add(media_id) - - -def detail(media_type, fixture): - test_id = fixture["results"][0]["id"] - response = requests.get(f"{API_URL}/v1/{media_type}/{test_id}", verify=False) - assert response.status_code == 200 - - -def stats(media_type, count_key="media_count"): - response = requests.get(f"{API_URL}/v1/{media_type}/stats", verify=False) - parsed_response = json.loads(response.text) - assert response.status_code == 200 - num_media = 0 - provider_count = 0 - for pair in parsed_response: - media_count = pair[count_key] - num_media += int(media_count) - provider_count += 1 - assert num_media > 0 - assert provider_count > 0 - - -def report(media_type, fixture): - test_id = fixture["results"][0]["id"] - response = requests.post( - f"{API_URL}/v1/{media_type}/{test_id}/report/", - json={ - "reason": "mature", - "description": "This item contains sensitive content", - }, - verify=False, - ) - assert response.status_code == 201 - data = json.loads(response.text) - assert data["identifier"] == test_id - - -def license_filter_case_insensitivity(media_type): - response = requests.get(f"{API_URL}/v1/{media_type}?license=bY", verify=False) - parsed = json.loads(response.text) - assert parsed["result_count"] > 0 - - -def uuid_validation(media_type, identifier): - response = requests.get(f"{API_URL}/v1/{media_type}/{identifier}", verify=False) - assert response.status_code == 404 - - -def related(fixture): - item = fixture["results"][0] - - response = requests.get(item["related_url"]).json() - results = response["results"] - - assert response["result_count"] == len(results) == 10 - assert response["page_count"] == 1 - - def get_terms_set(res): - # The title is analyzed in ES, we try to mimic it here. - terms = [t["name"] for t in res["tags"]] + re.split(" |-", res["title"]) - return {t.lower() for t in terms} - - terms_set = get_terms_set(item) - # Make sure each result has at least one word in common with the original item, - # or is by the same creator. - for result in results: - assert ( - len(terms_set.intersection(get_terms_set(result))) > 0 - or result["creator"] == item["creator"] - ), f"{terms_set} {get_terms_set(result)}/{result['creator']}-{item['creator']}" - - assert result["license_version"] is not None - assert result["attribution"] is not None - assert result["creator_url"] is not None - - -def sensitive_search_and_detail(media_type): - search_res = requests.get( - f"{API_URL}/v1/{media_type}/", - params={"q": "bird", "unstable__include_sensitive_results": "true"}, - verify=False, - ) - results = search_res.json()["results"] - - sensitive_result = None - sensitivities = [] - for result in results: - if sensitivities := result["unstable__sensitivity"]: - sensitive_result = result - break - assert sensitive_result is not None - assert len(sensitivities) != 0 - - detail_res = requests.get( - f"{API_URL}/v1/{media_type}/{sensitive_result['id']}", verify=False - ) - details = detail_res.json() - - assert sensitivities == details["unstable__sensitivity"] diff --git a/api/test/test_audio_integration.py b/api/test/test_audio_integration.py deleted file mode 100644 index 6bf1796d682..00000000000 --- a/api/test/test_audio_integration.py +++ /dev/null @@ -1,179 +0,0 @@ -""" -End-to-end API tests for audio. - -Can be used to verify a live deployment is functioning as designed. -Run with the `pytest -s` command from this directory. -""" - -import json - -import pytest -import requests -from django_redis import get_redis_connection - -from api.utils.check_dead_links import CACHE_PREFIX -from test.constants import API_URL -from test.media_integration import ( - creator_collection, - detail, - license_filter_case_insensitivity, - related, - report, - search, - search_all_excluded, - search_by_category, - search_consistency, - search_quotes, - search_quotes_exact, - search_source_and_excluded, - search_special_chars, - sensitive_search_and_detail, - source_collection, - stats, - tag_collection, - uuid_validation, -) - - -@pytest.fixture -def force_result_validity(): - statuses = {} - - def force_validity(query_response): - nonlocal statuses - new_statuses = { - f"{CACHE_PREFIX}{item['url']}": 200 for item in query_response["results"] - } - statuses |= new_statuses - with get_redis_connection() as redis: - redis.mset(new_statuses) - - yield force_validity - - with get_redis_connection() as redis: - redis.delete(*list(statuses.keys())) - - -@pytest.fixture -def audio_fixture(force_result_validity): - res = requests.get(f"{API_URL}/v1/audio/", verify=False) - parsed = res.json() - force_result_validity(parsed) - assert res.status_code == 200 - return parsed - - -@pytest.fixture -def jamendo_audio_fixture(force_result_validity): - """ - Get an audio object specifically from the Jamendo provider. - - Thumbnail tests must use Jamendo results because the Wikimedia - sample audio results do not have thumbnails. - """ - res = requests.get( - f"{API_URL}/v1/audio/", - data={"source": "jamendo"}, - verify=False, - ) - parsed = res.json() - force_result_validity(parsed) - assert res.status_code == 200 - return parsed - - -def test_search(audio_fixture): - search(audio_fixture) - - -def test_search_category_filtering(audio_fixture): - search_by_category("audio", "music", audio_fixture) - search_by_category("audio", "pronunciation", audio_fixture) - - -def test_search_category_filtering_fails(audio_fixture): - with pytest.raises(AssertionError): - search_by_category("audio", "not_valid", audio_fixture) - - -def test_search_all_excluded(): - search_all_excluded("audio", ["freesound", "jamendo", "wikimedia_audio"]) - - -def test_search_source_and_excluded(): - search_source_and_excluded("audio") - - -def test_search_quotes(): - search_quotes("audio", "love") - - -def test_search_quotes_exact(): - # ``dancing penguins`` returns different results when quoted vs unquoted - search_quotes_exact("audio", "dancing penguins") - - -def test_search_with_special_characters(): - search_special_chars("audio", "love") - - -def test_search_consistency(): - n_pages = 5 - search_consistency("audio", n_pages) - - -def test_audio_detail(audio_fixture): - detail("audio", audio_fixture) - - -def test_audio_stats(): - stats("audio") - - -def test_audio_detail_without_thumb(): - resp = requests.get(f"{API_URL}/v1/audio/44540200-91eb-483d-9e99-38ce86a52fb6") - assert resp.status_code == 200 - parsed = json.loads(resp.text) - assert parsed["thumbnail"] is None - - -def test_audio_search_without_thumb(): - """The first audio of this search should not have a thumbnail.""" - resp = requests.get(f"{API_URL}/v1/audio/?q=zaus") - assert resp.status_code == 200 - parsed = json.loads(resp.text) - assert parsed["results"][0]["thumbnail"] is None - - -def test_audio_report(audio_fixture): - report("audio", audio_fixture) - - -def test_audio_license_filter_case_insensitivity(): - license_filter_case_insensitivity("audio") - - -def test_audio_uuid_validation(): - uuid_validation("audio", "123456789123456789123456789123456789") - uuid_validation("audio", "12345678-1234-5678-1234-1234567891234") - uuid_validation("audio", "abcd") - - -def test_audio_related(audio_fixture): - related(audio_fixture) - - -def test_audio_tag_collection(): - tag_collection("audio") - - -def test_audio_source_collection(): - source_collection("audio") - - -def test_audio_creator_collection(): - creator_collection("audio") - - -def test_audio_sensitive_search_and_detail(): - sensitive_search_and_detail("audio") diff --git a/api/test/test_backwards_compat.py b/api/test/test_backwards_compat.py deleted file mode 100644 index 4250aa5d049..00000000000 --- a/api/test/test_backwards_compat.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Ensures that deprecated URLs are redirected to their updated paths and not left to rot. - -Can be used to verify a live deployment is functioning as designed. -Run with the `pytest -s` command from this directory. -""" - -import uuid - -import requests - -from test.constants import API_URL - - -def test_old_stats_endpoint(): - response = requests.get( - f"{API_URL}/v1/sources?type=images", allow_redirects=False, verify=False - ) - assert response.status_code == 301 - assert response.is_permanent_redirect - assert response.headers.get("Location") == "/v1/images/stats/" - - -def test_old_related_images_endpoint(): - idx = uuid.uuid4() - response = requests.get( - f"{API_URL}/v1/recommendations/images/{idx}", - allow_redirects=False, - verify=False, - ) - assert response.status_code == 301 - assert response.is_permanent_redirect - assert response.headers.get("Location") == f"/v1/images/{idx}/related/" - - -def test_old_oembed_endpoint(): - response = requests.get( - f"{API_URL}/v1/oembed?key=value", allow_redirects=False, verify=False - ) - assert response.status_code == 301 - assert response.is_permanent_redirect - assert response.headers.get("Location") == "/v1/images/oembed/?key=value" - - -def test_old_thumbs_endpoint(): - idx = uuid.uuid4() - response = requests.get( - f"{API_URL}/v1/thumbs/{idx}", allow_redirects=False, verify=False - ) - assert response.status_code == 301 - assert response.is_permanent_redirect - assert response.headers.get("Location") == f"/v1/images/{idx}/thumb/" diff --git a/api/test/test_image_integration.py b/api/test/test_image_integration.py deleted file mode 100644 index 34dcb840b87..00000000000 --- a/api/test/test_image_integration.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -End-to-end API tests for images. - -Can be used to verify a live deployment is functioning as designed. -Run with the `pytest -s` command from this directory. -""" - -import json -from urllib.parse import urlencode - -import pytest -import requests - -from test.constants import API_URL -from test.media_integration import ( - creator_collection, - detail, - license_filter_case_insensitivity, - related, - report, - search, - search_all_excluded, - search_consistency, - search_quotes, - search_quotes_exact, - search_source_and_excluded, - search_special_chars, - sensitive_search_and_detail, - source_collection, - stats, - tag_collection, - uuid_validation, -) - - -identifier = "cdbd3bf6-1745-45bb-b399-61ee149cd58a" - - -@pytest.fixture -def image_fixture(): - response = requests.get(f"{API_URL}/v1/images?q=dog", verify=False) - assert response.status_code == 200 - parsed = json.loads(response.text) - return parsed - - -def test_search(image_fixture): - search(image_fixture) - - -def test_search_all_excluded(): - search_all_excluded("images", ["flickr", "stocksnap"]) - - -def test_search_source_and_excluded(): - search_source_and_excluded("images") - - -def test_search_quotes(): - search_quotes("images", "dog") - - -def test_search_quotes_exact(): - # ``dancing penguins`` returns different results when quoted vs unquoted - search_quotes_exact("images", "dancing penguins") - - -def test_search_with_special_characters(): - search_special_chars("images", "dog") - - -def test_search_consistency(): - n_pages = 5 - search_consistency("images", n_pages) - - -def test_image_detail(image_fixture): - detail("images", image_fixture) - - -def test_image_stats(): - stats("images") - - -def test_audio_report(image_fixture): - report("images", image_fixture) - - -@pytest.mark.parametrize( - "url, expected_status_code", - [ - pytest.param( - f"https://any.domain/any/path/{identifier}", - 200, - id="OK; no trailing slash", - ), - pytest.param( - f"https://any.domain/any/path/{identifier}/", - 200, - id="OK; with trailing slash", - ), # trailing slash - pytest.param( - "https://any.domain/any/path/00000000-0000-0000-0000-000000000000", - 404, - id="Valid UUID but no matching identifier", - ), - pytest.param( - "https://any.domain/any/path/not-a-valid-uuid", - 400, - id="not a valid UUID", - ), - ], -) -def test_oembed_endpoint(url, expected_status_code): - params = {"url": url} - response = requests.get( - f"{API_URL}/v1/images/oembed?{urlencode(params)}", verify=False - ) - assert response.status_code == expected_status_code - - -def test_oembed_endpoint_for_json(): - params = { - "url": f"https://any.domain/any/path/{identifier}", - # 'format': 'json' is the default - } - response = requests.get( - f"{API_URL}/v1/images/oembed?{urlencode(params)}", verify=False - ) - assert response.status_code == 200 - assert response.headers["Content-Type"] == "application/json" - - parsed = response.json() - assert parsed["width"] == 1024 - assert parsed["height"] == 683 - assert parsed["license_url"] == "https://creativecommons.org/licenses/by/2.0/" - - -def test_image_license_filter_case_insensitivity(): - license_filter_case_insensitivity("images") - - -def test_image_uuid_validation(): - uuid_validation("images", "123456789123456789123456789123456789") - uuid_validation("images", "12345678-1234-5678-1234-1234567891234") - uuid_validation("images", "abcd") - - -def test_image_tag_collection(): - tag_collection("images", "cat") - - -def test_image_tag_collection_case_sensitive(): - tag_collection("images", "Cat") - - -def test_image_source_collection(): - source_collection("images") - - -def test_image_creator_collection(): - creator_collection("images") - - -def test_image_related(image_fixture): - related(image_fixture) - - -def test_image_sensitive_search_and_detail(): - sensitive_search_and_detail("images") diff --git a/api/test/test_v1_integration.py b/api/test/test_v1_integration.py deleted file mode 100644 index b29c57f4a8f..00000000000 --- a/api/test/test_v1_integration.py +++ /dev/null @@ -1,247 +0,0 @@ -""" -End-to-end API tests. - -Can be used to verify a live deployment is functioning as designed. -Run with the `pytest -s` command from this directory. -""" - -import json - -import pytest -import requests - -from api.constants.licenses import LICENSE_GROUPS -from api.models import Image -from api.utils.watermark import watermark -from test.constants import API_URL - - -@pytest.fixture -def image_fixture(): - response = requests.get(f"{API_URL}/v1/images?q=dog", verify=False) - assert response.status_code == 200 - parsed = json.loads(response.text) - return parsed - - -def test_link_shortener_create(): - payload = {"full_url": "abcd"} - response = requests.post(f"{API_URL}/v1/link/", json=payload, verify=False) - assert response.status_code == 410 - - -def test_link_shortener_resolve(): - response = requests.get(f"{API_URL}/v1/link/abc", verify=False) - assert response.status_code == 410 - - -@pytest.mark.skip(reason="Disabled feature") -@pytest.fixture -def test_list_create(image_fixture): - payload = { - "title": "INTEGRATION TEST", - "images": [image_fixture["results"][0]["id"]], - } - response = requests.post(f"{API_URL}/list", json=payload, verify=False) - parsed_response = json.loads(response.text) - assert response.status_code == 201 - return parsed_response - - -@pytest.mark.skip(reason="Disabled feature") -def test_list_detail(test_list_create): - list_slug = test_list_create["url"].split("/")[-1] - response = requests.get(f"{API_URL}/list/{list_slug}", verify=False) - assert response.status_code == 200 - - -@pytest.mark.skip(reason="Disabled feature") -def test_list_delete(test_list_create): - list_slug = test_list_create["url"].split("/")[-1] - token = test_list_create["auth"] - headers = {"Authorization": f"Token {token}"} - response = requests.delete( - f"{API_URL}/list/{list_slug}", headers=headers, verify=False - ) - assert response.status_code == 204 - - -def test_license_type_filtering(): - """Ensure that multiple license type filters interact together correctly.""" - - commercial = LICENSE_GROUPS["commercial"] - modification = LICENSE_GROUPS["modification"] - commercial_and_modification = set.intersection(modification, commercial) - response = requests.get( - f"{API_URL}/v1/images?q=dog&license_type=commercial,modification", verify=False - ) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"] in commercial_and_modification - - -def test_single_license_type_filtering(): - commercial = LICENSE_GROUPS["commercial"] - response = requests.get( - f"{API_URL}/v1/images?q=dog&license_type=commercial", verify=False - ) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"] in commercial - - -def test_specific_license_filter(): - response = requests.get(f"{API_URL}/v1/images?q=dog&license=by", verify=False) - parsed = json.loads(response.text) - for result in parsed["results"]: - assert result["license"] == "by" - - -def test_creator_quotation_grouping(): - """Test that quotation marks can be used to narrow down search results.""" - - no_quotes = json.loads( - requests.get(f"{API_URL}/v1/images?creator=Steve%20Wedgwood", verify=False).text - ) - quotes = json.loads( - requests.get( - f'{API_URL}/v1/images?creator="Steve%20Wedgwood"', verify=False - ).text - ) - # Did quotation marks actually narrow down the search? - assert len(no_quotes["results"]) > len(quotes["results"]) - # Did we find only William Ford Stanley works, or also by others? - for result in quotes["results"]: - assert "Steve Wedgwood" in result["creator"] - - -@pytest.mark.skip(reason="Unmaintained feature/grequests ssl recursion bug") -def test_watermark_preserves_exif(): - img_with_exif = ( - "https://raw.githubusercontent.com/ianare/exif-samples/" - "master/jpg/Canon_PowerShot_S40.jpg" - ) - info = { - "title": "test", - "creator": "test", - "license": "test", - "license_version": "test", - } - _, exif = watermark(image_url=img_with_exif, info=info) - assert exif is not None - - img_no_exif = ( - "https://creativecommons.org/wp-content/uploads/" - "2019/03/9467312978_64cd5d2f3b_z.jpg" - ) - _, no_exif = watermark(image_url=img_no_exif, info=info) - assert no_exif is None - - -def test_attribution(): - """ - Check that the API includes an attribution string. - - Since there are some works where the title or creator is not known, the format of - the attribution string can need to be tweaked slightly. - """ - - title_and_creator_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=None, - creator=None, - license="by", - license_version="3.0", - ) - assert "This work" in title_and_creator_missing.attribution - - title = "A foo walks into a bar" - creator_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=title, - creator=None, - license="by", - license_version="3.0", - ) - assert title in creator_missing.attribution - assert "by " not in creator_missing.attribution - - creator = "John Doe" - title_missing = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=None, - creator=creator, - license="by", - license_version="3.0", - ) - assert creator in title_missing.attribution - assert "This work" in title_missing.attribution - - all_data_present = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title=title, - creator=creator, - license="by", - license_version="3.0", - ) - assert title in all_data_present.attribution - assert creator in all_data_present.attribution - - -def test_license_override(): - null_license_url = Image( - identifier="ab80dbe1-414c-4ee8-9543-f9599312aeb8", - title="test", - creator="test", - license="by", - license_version="3.0", - meta_data={"license_url": "null"}, - ) - assert null_license_url.license_url is not None - - -def test_source_search(): - response = requests.get(f"{API_URL}/v1/images?source=flickr", verify=False) - if response.status_code != 200: - print(f"Request failed. Message: {response.body}") - assert response.status_code == 200 - parsed = json.loads(response.text) - assert parsed["result_count"] > 0 - - -def test_extension_filter(): - response = requests.get(f"{API_URL}/v1/images?q=dog&extension=jpg") - parsed = json.loads(response.text) - for result in parsed["results"]: - assert ".jpg" in result["url"] - - -@pytest.fixture -def recommendation_factory(): - """Allow passing url parameters along with a related images request.""" - - def _parameterized_search(identifier, **kwargs): - response = requests.get( - f"{API_URL}/v1/recommendations?type=images&id={identifier}", - params=kwargs, - verify=False, - ) - assert response.status_code == 200 - parsed = response.json() - return parsed - - return _parameterized_search - - -@pytest.mark.skip( - reason="Generally, we don't paginate related images, so " - "consistency is less of an issue." -) -def test_related_image_search_page_consistency( - recommendation, search_without_dead_links -): - initial_images = search_without_dead_links(q="*", page_size=10) - for image in initial_images["results"]: - related = recommendation_factory(image["id"]) - assert related["result_count"] > 0 - assert len(related["results"]) == 10 diff --git a/api/test/unit/conftest.py b/api/test/unit/conftest.py index ad70cd77965..674b97a6eab 100644 --- a/api/test/unit/conftest.py +++ b/api/test/unit/conftest.py @@ -1,8 +1,6 @@ from dataclasses import dataclass from unittest.mock import MagicMock -from rest_framework.test import APIClient, APIRequestFactory - import pook import pytest from elasticsearch import Elasticsearch @@ -39,11 +37,6 @@ ) -@pytest.fixture -def api_client(): - return APIClient() - - @pytest.fixture(autouse=True) def sentry_capture_exception(monkeypatch): mock = MagicMock() @@ -52,13 +45,6 @@ def sentry_capture_exception(monkeypatch): yield mock -@pytest.fixture -def request_factory() -> APIRequestFactory(): - request_factory = APIRequestFactory(defaults={"REMOTE_ADDR": "192.0.2.1"}) - - return request_factory - - @dataclass class MediaTypeConfig: media_type: str @@ -158,3 +144,12 @@ def cleanup_elasticsearch_test_documents(request, settings): query={"match": {"tags.name": CREATED_BY_FIXTURE_MARKER}}, refresh=True, ) + + +__all__ = [ + "sentry_capture_exception", + "image_media_type_config", + "audio_media_type_config", + "media_type_config", + "cleanup_elasticsearch_test_documents", +] diff --git a/api/test/unit/models/test_media.py b/api/test/unit/models/test_media.py new file mode 100644 index 00000000000..6d7d3b00e67 --- /dev/null +++ b/api/test/unit/models/test_media.py @@ -0,0 +1,56 @@ +import pytest + +from api.models import Audio, Image + + +media_type_params = pytest.mark.parametrize( + "media_type, media_model", + [ + ("image", Image), + ("audio", Audio), + ], +) + + +@media_type_params +@pytest.mark.parametrize( + "fields, attribution", + [ + ( + ["title", "creator"], + '"A foo walks into a bar" by John Doe is licensed under CC BY 3.0.', + ), + (["title"], '"A foo walks into a bar" is licensed under CC BY 3.0.'), + (["creator"], "This work by John Doe is licensed under CC BY 3.0."), + ([], "This work is licensed under CC BY 3.0."), + ], +) +def test_attribution_handles_missing_title_or_creator( + media_type, media_model, fields, attribution +): + field_values = { + "title": "A foo walks into a bar", + "creator": "John Doe", + } + + obj = media_model( + license="by", + license_version="3.0", + ) + for field in fields: + setattr(obj, field, field_values[field]) + + assert attribution in obj.attribution + assert ( + "To view a copy of this license, " + "visit https://creativecommons.org/licenses/by/3.0/." + ) in obj.attribution + + +@media_type_params +def test_license_url_is_generated_if_missing(media_type, media_model): + obj = media_model( + license="by", + license_version="3.0", + ) + assert obj.license_url is not None