Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TriggerDagRunOperator with deferrable parameter #30406

Merged
merged 8 commits into from
Apr 13, 2023
Merged

Fix TriggerDagRunOperator with deferrable parameter #30406

merged 8 commits into from
Apr 13, 2023

Conversation

dylanbstorey
Copy link
Contributor

@dylanbstorey dylanbstorey commented Mar 31, 2023

Fixes a bug in the TriggerDagRunOperator when operating where deferrable=True.

The cause of the bug was a misunderstanding in which execution date was being brought through with the context + a bad mocked value in the original tests.

The suggested remedy (here) is to have the Trigger instance return a serialized version of itself and then use that to validate the exit status of the external dag.

Functional test below
Screenshot 2023-03-31 181510

from datetime import datetime
from airflow import DAG
from airflow.utils.state import TaskInstanceState, State

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG('driver_dags',
         schedule_interval=None,
         start_date=datetime(2023, 2, 7),
         catchup=False,
         ) as driver_dag:

    driver = TriggerDagRunOperator(
        task_id='driver',
        trigger_dag_id='down_stream',
        reset_dag_run=True,
        wait_for_completion=True,
        poke_interval=20,
        allowed_states=[TaskInstanceState.SUCCESS],
        do_xcom_push=True,
        deferrable=True,
    )


def _dummy_task():
    from time import sleep
    """ Create dummy object"""
    sleep(15)
    return


with DAG('down_stream',  schedule_interval=None,
         start_date=datetime(2023, 1, 23), is_paused_upon_creation=False, catchup=False) as downstream:

    empty = PythonOperator(
        task_id="empty", python_callable=_dummy_task,
        )

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Mar 31, 2023
@dylanbstorey dylanbstorey changed the title update triggerer to work correctly BugFix : TriggerDagRunOperator Mar 31, 2023
Copy link
Contributor

@josh-fell josh-fell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. A small question on potentially modifying the unit tests slightly to match the desired exception message.

tests/operators/test_trigger_dagrun.py Outdated Show resolved Hide resolved
tests/operators/test_trigger_dagrun.py Outdated Show resolved Hide resolved
@dylanbstorey dylanbstorey reopened this Apr 5, 2023
@eladkal eladkal changed the title BugFix : TriggerDagRunOperator Fix TriggerDagRunOperator with deferrable parameter Apr 5, 2023
@eladkal eladkal added this to the Airflow 2.5.4 milestone Apr 5, 2023
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Apr 5, 2023
@dylanbstorey
Copy link
Contributor Author

dylanbstorey commented Apr 12, 2023

Finished completing the PR after the UI rebase/merge issues last week, re requesting review and approval from previous individuals to just verify everything one last time.
Local tests and functional tests with an example DAG appear good.

Screenshot 2023-04-11 210459

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is based on the result of self.serialize(), I wonder if we can add a test for this method (without mocking it) to make sure that when someone change it, a test for trigger dagrun blocks him until update the code which use it. WDYT?

airflow/triggers/external_task.py Show resolved Hide resolved
@hussein-awala hussein-awala merged commit 5e23de5 into apache:main Apr 13, 2023
@ephraimbuddy ephraimbuddy removed this from the Airflow 2.5.4 milestone Apr 13, 2023
@ephraimbuddy ephraimbuddy added this to the Airflow 2.6.0 milestone Apr 13, 2023
@dylanbstorey dylanbstorey deleted the triggerer-fix branch April 16, 2023 19:48
wookiist pushed a commit to wookiist/airflow that referenced this pull request Apr 19, 2023
* readding after borked it

* pre-commit

* finally fixing after the github issue last week

* push fix

* feedback from hussein
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants