Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify DAG creation/database cleaning fixtures for testing #2622

Closed
AetherUnbound opened this issue Jul 11, 2023 · 4 comments · Fixed by #3361
Closed

Unify DAG creation/database cleaning fixtures for testing #2622

AetherUnbound opened this issue Jul 11, 2023 · 4 comments · Fixed by #3361
Assignees
Labels
🤖 aspect: dx Concerns developers' experience with the codebase ✨ goal: improvement Improvement to an existing user-facing feature good first issue New-contributor friendly help wanted Open to participation from the community 🟩 priority: low Low priority and doesn't need to be rushed 🧱 stack: catalog Related to the catalog and Airflow DAGs 🔧 tech: airflow Involves Apache Airflow 🐍 tech: pytest Involves pytest 🐍 tech: python Involves Python

Comments

@AetherUnbound
Copy link
Collaborator

Description

We have a few places in our catalog tests with very similar logic for creating the DAG necessary for running some tests. Examples:

@pytest.fixture(autouse=True)
def clean_db():
with create_session() as session:
# synchronize_session='fetch' required here to refresh models
# https://stackoverflow.com/a/51222378 CC BY-SA 4.0
session.query(DagRun).filter(DagRun.dag_id.startswith(DAG_PREFIX)).delete(
synchronize_session="fetch"
)
session.query(TaskInstance).filter(
TaskInstance.dag_id.startswith(DAG_PREFIX)
).delete(synchronize_session="fetch")
session.query(Pool).filter(id == TEST_POOL).delete()

@pytest.fixture(autouse=True)
def clean_db():
with create_session() as session:
session.query(DagRun).filter(DagRun.dag_id == TEST_DAG_ID).delete()

@pytest.fixture(autouse=True)
def clean_db():
with create_session() as session:
# synchronize_session='fetch' required here to refresh models
# https://stackoverflow.com/a/51222378 CC BY-SA 4.0
session.query(DagRun).filter(DagRun.dag_id.startswith(TEST_DAG_ID)).delete(
synchronize_session="fetch"
)
session.query(TaskInstance).filter(
TaskInstance.dag_id.startswith(TEST_DAG_ID)
).delete(synchronize_session="fetch")

@pytest.fixture(autouse=True)
def clean_db():
with create_session() as session:
session.query(DagRun).filter(DagRun.dag_id == TEST_DAG_ID).delete()

def _clean_dag_from_db():
with create_session() as session:
session.query(DagRun).filter(DagRun.dag_id == DAG_ID).delete()
session.query(TaskInstance).filter(TaskInstance.dag_id == DAG_ID).delete()
@pytest.fixture()
def clean_db():
_clean_dag_from_db()
yield
_clean_dag_from_db()

Although there are some slight differences, it would be ideal if we could find a way to combine these into a single fixture that's used within all the affected tests, rather than redefining the fixture for every test.

Additional context

Came up in discussing #2209 (comment)

@AetherUnbound AetherUnbound added ✨ goal: improvement Improvement to an existing user-facing feature 🐍 tech: pytest Involves pytest 🐍 tech: python Involves Python 🔧 tech: airflow Involves Apache Airflow 🟩 priority: low Low priority and doesn't need to be rushed 🤖 aspect: dx Concerns developers' experience with the codebase 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Jul 11, 2023
@github-project-automation github-project-automation bot moved this to 📋 Backlog in Openverse Backlog Jul 11, 2023
@AetherUnbound AetherUnbound added good first issue New-contributor friendly help wanted Open to participation from the community labels Oct 25, 2023
@ngken0995
Copy link
Collaborator

@AetherUnbound Can I be assigned to this issue?

@ngken0995
Copy link
Collaborator

ngken0995 commented Nov 14, 2023

@AetherUnbound @sarayourfriend Here is a code I tried to implement in catalog/test/conftest/py.

@pytest.fixture()
def get_test_dag_id():
    return ""

@pytest.fixture()
def get_test_pool():
    return ""
@pytest.fixture()
def isTaskInstance():
    return False

@pytest.fixture()
def isPool():
    return False

@pytest.fixture(autouse=True)
def clean_db(get_test_dag_id, get_test_pool, isTaskInstance, isPool):
    with create_session() as session:
        # synchronize_session='fetch' required here to refresh models
        # https://stackoverflow.com/a/51222378 CC BY-SA 4.0
        session.query(DagRun).filter(DagRun.dag_id.startswith(get_test_dag_id)).delete(
            synchronize_session="fetch"
        )
        if isTaskInstance:
            session.query(TaskInstance).filter(
                TaskInstance.dag_id.startswith(get_test_dag_id)
            ).delete(synchronize_session="fetch")
        if isPool:
            session.query(Pool).filter(id == get_test_pool).delete()

The goal was to create pytest.fixture on each individual test to override get_test_dag_id, get_test_pool, isTaskInstance, and isPool global values. I thought clean_db will re-run again each time another test ran which I'm not sure if it did. It fails when I run just catalog/test. Am I going on the right direction? How do I make sure clean_db will trigger every time a new test is being ran?

@sarayourfriend
Copy link
Collaborator

sarayourfriend commented Nov 15, 2023

As far as getting clean_db to run for each test, what you've got make sense to me. The only thing is if you want it to run after the test, you can yield at the start of the function. Pytest runs fixture functions before the test, and if they yield, it'll call .next on the generator after the test is down, sort of like a tear-down step. Not sure if that is the issue though.

I also don't know for sure if it's possible to create local overrides for fixtures the way you're proposing. I think the right approach for your idea (which sounds good to me), is to use the request fixture to get information from the test's module:

https://docs.pytest.org/en/7.1.x/how-to/fixtures.html#fixtures-can-introspect-the-requesting-test-context

The idea would be to define test_dag_id, test_pool, etc, as variables in the module. They can be functions too, if you want. Then retrieve them using something similar to the code in the example in the docs:

    test_dag_id = getattr(request.module, "test_dag_id", None)
    # probably raise an exception if `test_dag_id` is None?

Also, if clean_db only needs to run for a subset of tests, like a group of tests that are all in a subdirectory (even if nested), then it should go into a conftest.py in that directory, so that it doesn't run for tests that don't need it.

@ngken0995
Copy link
Collaborator

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'function'
E       [SQL: DELETE FROM dag_run WHERE dag_run.dag_id = %(dag_id_1)s]
E       [parameters: {'dag_id_1': <function test_dag_id at 0x7f94651632e0>}]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:736: ProgrammingError

I tried using the request fixture and there is an error when calling getattr. I have created a pull request using an override method.(link).

@openverse-bot openverse-bot moved this from 📋 Backlog to ✅ Done in Openverse Backlog Dec 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖 aspect: dx Concerns developers' experience with the codebase ✨ goal: improvement Improvement to an existing user-facing feature good first issue New-contributor friendly help wanted Open to participation from the community 🟩 priority: low Low priority and doesn't need to be rushed 🧱 stack: catalog Related to the catalog and Airflow DAGs 🔧 tech: airflow Involves Apache Airflow 🐍 tech: pytest Involves pytest 🐍 tech: python Involves Python
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants