Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/WordPress/openverse into in…
Browse files Browse the repository at this point in the history
…t_test_using_client
  • Loading branch information
dhruvkb committed Dec 20, 2023
2 parents de4d65f + f56f731 commit a625c09
Show file tree
Hide file tree
Showing 28 changed files with 259 additions and 287 deletions.
8 changes: 2 additions & 6 deletions .github/ISSUE_TEMPLATE/project_thread.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ labels: "🧭 project: thread"
title: "<Replace this with actual title>"
---

| Start Date | ETA | Project Lead | Actual Ship Date |
| ---------- | ---------- | ------------ | ---------------- |
| YYYY-MM-DD | YYYY-MM-DD | TBD | TBD |

## Description

<!-- Describe the feature in simple terms understandable by a non-technical audience who is unfamiliar with the internal workings of Openverse. Limit specific implementation details that are better served by the project's other documents. -->
Expand All @@ -20,9 +16,9 @@ title: "<Replace this with actual title>"
- [ ] Project Proposal
- [ ] Implementation Plan(s)

## Issues
## Milestones/Issues

<!-- Link to all of the issues that must be completed as part of this project, typically those created after implementation planning but occasionally existing ones. -->
<!-- Link to all the milestones or issues that must be completed as part of this project, typically those created after implementation planning but occasionally existing ones. -->

## Prior Art

Expand Down
48 changes: 4 additions & 44 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,53 +35,13 @@ repos:
- id: check-docstring-first
- id: requirements-txt-fixer

# Use the `.isort.cfg` file to configure additional project-specific requirements.
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.8
hooks:
- id: isort
files: \.py$
exclude: ^build/.*$|^.tox/.*$|^venv/.*$
args:
- --profile=black
- --lines-after-imports=2

- repo: https://github.com/asottile/pyupgrade
rev: v3.15.0
hooks:
- id: pyupgrade
args:
- --py310-plus

- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: "v0.1.7"
hooks:
- id: ruff
- id: ruff # replaces Flake8, isort, pydocstyle, pyupgrade
args:
- --fix

- repo: https://github.com/ambv/black
rev: 23.11.0
hooks:
- id: black
args:
- --safe

- repo: https://github.com/pycqa/pydocstyle
rev: 6.3.0
hooks:
- id: pydocstyle
args:
- --convention=pep257
# Additional ignore reasons:
# D1xx: we do not want to force contributors to write redundant or useless docstrings
# D202: additional whitespace helps with readability
# D205: we don't want to always require a single line summary
# D211: same as D202
# D400: first line doesn't need to end in a period
# See the following documentation for what each rule does:
# https://www.pydocstyle.org/en/6.2.3/error_codes.html#error-codes
- --add-ignore=D1,D202,D205,D211,D400
- id: ruff-format # replaces Black

# Use the `.prettierignore` and `.prettier.config.js` files to configure project-specific requirements.
- repo: https://github.com/pre-commit/mirrors-prettier
Expand Down
2 changes: 1 addition & 1 deletion api/api/views/image_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def thumbnail(self, *args, **kwargs):
@action(detail=True, url_path="watermark", url_name="watermark")
def watermark(self, request, *_, **__): # noqa: D401
"""
This endpoint is deprecated.
Note that this endpoint is deprecated.
---
Expand Down
71 changes: 18 additions & 53 deletions api/test/fixtures/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,18 @@
import asyncio

import pytest

from conf.asgi import application


@pytest.fixture
def get_new_loop():
loops: list[asyncio.AbstractEventLoop] = []

def _get_new_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
loops.append(loop)
return loop

yield _get_new_loop

for loop in loops:
loop.close()


@pytest.fixture(scope="session")
def session_loop() -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
yield loop
loop.close()


@pytest.fixture(scope="session", autouse=True)
def ensure_asgi_lifecycle(session_loop: asyncio.AbstractEventLoop):
"""
Call application shutdown lifecycle event.
This cannot be an async fixture because the scope is session
and pytest-asynio's `event_loop` fixture, which is auto-used
for async tests and fixtures, is function scoped, which is
incomatible with session scoped fixtures. `async_to_sync` works
fine here, so it's not a problem.
This cannot yet call the startup signal due to:
https://github.com/illagrenan/django-asgi-lifespan/pull/80
"""
scope = {"type": "lifespan"}

async def noop(*args, **kwargs):
...

async def shutdown():
return {"type": "lifespan.shutdown"}

yield
session_loop.run_until_complete(application(scope, shutdown, noop))
from test.fixtures.asynchronous import ensure_asgi_lifecycle, get_new_loop, session_loop
from test.fixtures.cache import (
django_cache,
redis,
unreachable_django_cache,
unreachable_redis,
)


__all__ = [
"ensure_asgi_lifecycle",
"get_new_loop",
"session_loop",
"django_cache",
"redis",
"unreachable_django_cache",
"unreachable_redis",
]
4 changes: 1 addition & 3 deletions api/test/unit/controllers/test_search_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,7 @@ def test_no_post_process_results_recursion(
# Ensure dead link filtering does not remove any results
pook.head(
pook.regex(rf"{MOCK_LIVE_RESULT_URL_PREFIX}/\d"),
).times(
hit_count
).reply(200)
).times(hit_count).reply(200)

serializer = image_media_type_config.search_request_serializer(
# This query string does not matter, ultimately, as pook is mocking
Expand Down
4 changes: 1 addition & 3 deletions api/test/unit/utils/test_image_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ def auth_key():

@pytest.fixture
def photon_get(session_loop):
"""
Run ``image_proxy.get`` and wait for all tasks to finish.
"""
"""Run ``image_proxy.get`` and wait for all tasks to finish."""

def do(*args, **kwargs):
try:
Expand Down
6 changes: 3 additions & 3 deletions api/test/unit/utils/test_watermark.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
@dataclass
class RequestsFixture:
requests: list[Request]
response_factory: Callable[ # noqa: E731
[Request], Response
] = lambda x: RequestsFixture._default_response_factory(x)
response_factory: Callable[[Request], Response] = ( # noqa: E731
lambda x: RequestsFixture._default_response_factory(x)
)

@staticmethod
def _default_response_factory(req: Request) -> Response:
Expand Down
6 changes: 3 additions & 3 deletions api/test/unit/utils/test_waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
@dataclass
class RequestsFixture:
requests: list[Request]
response_factory: Callable[ # noqa: E731
[Request], Response
] = lambda x: RequestsFixture._default_response_factory(x)
response_factory: Callable[[Request], Response] = ( # noqa: E731
lambda x: RequestsFixture._default_response_factory(x)
)

@staticmethod
def _default_response_factory(req: Request) -> Response:
Expand Down
4 changes: 1 addition & 3 deletions api/test/unit/views/test_media_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ def test_list_query_count(api_client, media_type_config):
), patch(
"api.serializers.media_serializers.search_controller",
get_sources=MagicMock(return_value={}),
), pytest_django.asserts.assertNumQueries(
1
):
), pytest_django.asserts.assertNumQueries(1):
res = api_client.get(f"/v1/{media_type_config.url_prefix}/")

assert res.status_code == 200
Expand Down
3 changes: 1 addition & 2 deletions catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def skip_report_completion(
) -> bool:
return (
# Duration must be provided and be a value greater than 1 second
duration is None
or duration in ("inf", "less than 1 sec")
duration is None or duration in ("inf", "less than 1 sec")
) and (
# Record counts by media type must be provided and at least one value must
# be truthy (i.e. not None)
Expand Down
46 changes: 46 additions & 0 deletions catalog/dags/common/sensors/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from datetime import datetime

from airflow.decorators import task
from airflow.exceptions import AirflowSensorTimeout
from airflow.models import DagRun
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State

from common.constants import REFRESH_POKE_INTERVAL


def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime:
Expand All @@ -27,3 +33,43 @@ def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime:
# ``ExternalTaskSensor::get_count``, especially the handling
# of ``dttm_filter`` for the relevant implementation details.
return []


def wait_for_external_dag(external_dag_id: str, task_id: str | None = None):
"""
Return a Sensor task which will wait if the given external DAG is
running.
"""
if not task_id:
task_id = f"wait_for_{external_dag_id}"

return ExternalTaskSensor(
task_id=task_id,
poke_interval=REFRESH_POKE_INTERVAL,
external_dag_id=external_dag_id,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
execution_date_fn=lambda _: get_most_recent_dag_run(external_dag_id),
mode="reschedule",
# Any "finished" state is sufficient for us to continue
allowed_states=[State.SUCCESS, State.FAILED],
)


@task(retries=0)
def prevent_concurrency_with_dag(external_dag_id: str, **context):
"""
Prevent concurrency with the given external DAG, by failing
immediately if that DAG is running.
"""

wait_for_dag = wait_for_external_dag(
external_dag_id=external_dag_id,
task_id=f"check_for_running_{external_dag_id}",
)
wait_for_dag.timeout = 0
try:
wait_for_dag.execute(context)
except AirflowSensorTimeout:
raise ValueError(f"Concurrency check with {external_dag_id} failed.")
37 changes: 5 additions & 32 deletions catalog/dags/data_refresh/create_filtered_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,10 @@
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowSensorTimeout
from airflow.models.param import Param
from airflow.sensors.external_task import ExternalTaskSensor

from common.constants import DAG_DEFAULT_ARGS
from common.sensors.utils import get_most_recent_dag_run
from common.sensors.utils import prevent_concurrency_with_dag
from data_refresh.create_filtered_index import (
create_filtered_index_creation_task_groups,
)
Expand All @@ -82,31 +79,6 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh):
"""
media_type = data_refresh.media_type

@task(
task_id=f"prevent_concurrency_with_{media_type}_data_refresh",
)
def prevent_concurrency_with_data_refresh(**context):
data_refresh_dag_id = f"{media_type}_data_refresh"
wait_for_filtered_index_creation = ExternalTaskSensor(
task_id="check_for_running_data_refresh",
external_dag_id=data_refresh_dag_id,
# Set timeout to 0 to prevent retries. If the data refresh DAG is running,
# immediately fail the filtered index creation DAG.
timeout=0,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
execution_date_fn=lambda _: get_most_recent_dag_run(data_refresh_dag_id),
mode="reschedule",
)
try:
wait_for_filtered_index_creation.execute(context)
except AirflowSensorTimeout:
raise ValueError(
f"{media_type} data refresh concurrency check failed. "
"Filtered index creation cannot start during a data refresh."
)

with DAG(
dag_id=f"create_filtered_{media_type}_index",
default_args=DAG_DEFAULT_ARGS,
Expand Down Expand Up @@ -141,9 +113,10 @@ def prevent_concurrency_with_data_refresh(**context):
},
render_template_as_native_obj=True,
) as dag:
prevent_concurrency = prevent_concurrency_with_data_refresh.override(
retries=0
)()
# Immediately fail if the associated data refresh is running.
prevent_concurrency = prevent_concurrency_with_dag.override(
task_id=f"prevent_concurrency_with_{media_type}_data_refresh"
)(external_dag_id=f"{media_type}_data_refresh")

# Once the concurrency check has passed, actually create the filtered
# index.
Expand Down
26 changes: 3 additions & 23 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@
from collections.abc import Sequence

from airflow.models.baseoperator import chain
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from common import ingestion_server
from common.constants import XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import get_most_recent_dag_run
from common.sensors.utils import wait_for_external_dag
from data_refresh.create_filtered_index import (
create_filtered_index_creation_task_groups,
)
Expand Down Expand Up @@ -120,27 +118,9 @@ def create_data_refresh_task_group(
# filtered index creation process, even if it was triggered immediately after
# filtered index creation. However, it is safer to avoid the possibility
# of the race condition altogether.
# ``execution_date_fn`` is used to find the most recent run because
# the filtered index creation DAGs are unscheduled so we can't derive
# anything from the execution date of the current data refresh DAG.
create_filtered_index_dag_id = (
f"create_filtered_{data_refresh.media_type}_index"
wait_for_filtered_index_creation = wait_for_external_dag(
external_dag_id=f"create_filtered_{data_refresh.media_type}_index",
)
wait_for_filtered_index_creation = ExternalTaskSensor(
task_id="wait_for_create_and_populate_filtered_index",
external_dag_id=create_filtered_index_dag_id,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
poke_interval=data_refresh.filtered_index_poke_interval,
execution_date_fn=lambda _: get_most_recent_dag_run(
create_filtered_index_dag_id
),
mode="reschedule",
# Any "finished" state is sufficient for us to continue.
allowed_states=[State.SUCCESS, State.FAILED],
)

tasks.append([wait_for_data_refresh, wait_for_filtered_index_creation])

# Get the index currently mapped to our target alias, to delete later.
Expand Down
Loading

0 comments on commit a625c09

Please sign in to comment.