From 05f41093b2f6a8b4b107a3ae93407eb3de80087b Mon Sep 17 00:00:00 2001 From: flolas Date: Fri, 26 Apr 2024 22:38:20 -0400 Subject: [PATCH 1/2] added logical_date parameter --- airflow/operators/trigger_dagrun.py | 62 ++++++----- tests/operators/test_trigger_dagrun.py | 148 ++++++++++++++----------- 2 files changed, 120 insertions(+), 90 deletions(-) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index ab74d4c8620a52..81c7863770b5fd 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -20,6 +20,7 @@ import datetime import json import time +import warnings from typing import TYPE_CHECKING, Any, Sequence, cast from sqlalchemy import select @@ -27,7 +28,7 @@ from airflow.api.common.trigger_dag import trigger_dag from airflow.configuration import conf -from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists +from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists, RemovedInAirflow3Warning from airflow.models.baseoperator import BaseOperator from airflow.models.baseoperatorlink import BaseOperatorLink from airflow.models.dag import DagModel @@ -41,7 +42,7 @@ from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType -XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso" +XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso" XCOM_RUN_ID = "trigger_run_id" @@ -64,7 +65,7 @@ class TriggerDagRunLink(BaseOperatorLink): def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: # Fetch the correct execution date for the triggerED dag which is # stored in xcom during execution of the triggerING task. - when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO) + when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO) query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when} return build_airflow_url_with_query(query) @@ -77,7 +78,7 @@ class TriggerDagRunOperator(BaseOperator): :param trigger_run_id: The run ID to use for the triggered DAG run (templated). If not provided, a run ID will be automatically generated. :param conf: Configuration for the DAG run (templated). - :param execution_date: Execution date for the dag (templated). + :param logical_date: Logical for the dag (templated). :param reset_dag_run: Whether clear existing dag run if already exists. This is useful when backfill or rerun an existing dag run. This only resets (not recreates) the dag run. @@ -91,12 +92,13 @@ class TriggerDagRunOperator(BaseOperator): :param failed_states: List of failed or dis-allowed states, default is ``None``. :param deferrable: If waiting for completion, whether or not to defer the task until done, default is ``False``. + :param execution_date: Deprecated parameter; same as ``logical_date``. """ template_fields: Sequence[str] = ( "trigger_dag_id", "trigger_run_id", - "execution_date", + "logical_date", "conf", "wait_for_completion", ) @@ -110,13 +112,14 @@ def __init__( trigger_dag_id: str, trigger_run_id: str | None = None, conf: dict | None = None, - execution_date: str | datetime.datetime | None = None, + logical_date: str | datetime.datetime | None = None, reset_dag_run: bool = False, wait_for_completion: bool = False, poke_interval: int = 60, allowed_states: list[str] | None = None, failed_states: list[str] | None = None, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + execution_date: str | datetime.datetime | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -136,20 +139,29 @@ def __init__( self.failed_states = [DagRunState.FAILED] self._defer = deferrable - if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)): + if execution_date is not None: + warnings.warn( + "Parameter 'execution_date' is deprecated. Use 'logical_date' instead.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + logical_date = execution_date + + if logical_date is not None and not isinstance(logical_date, (str, datetime.datetime)): + type_name = type(logical_date).__name__ raise TypeError( - f"Expected str or datetime.datetime type for execution_date.Got {type(execution_date)}" + f"Expected str or datetime.datetime type for parameter 'logical_date'. Got {type_name}" ) - self.execution_date = execution_date + self.logical_date = logical_date def execute(self, context: Context): - if isinstance(self.execution_date, datetime.datetime): - parsed_execution_date = self.execution_date - elif isinstance(self.execution_date, str): - parsed_execution_date = timezone.parse(self.execution_date) + if isinstance(self.logical_date, datetime.datetime): + parsed_logical_date = self.logical_date + elif isinstance(self.logical_date, str): + parsed_logical_date = timezone.parse(self.logical_date) else: - parsed_execution_date = timezone.utcnow() + parsed_logical_date = timezone.utcnow() try: json.dumps(self.conf) @@ -159,20 +171,20 @@ def execute(self, context: Context): if self.trigger_run_id: run_id = str(self.trigger_run_id) else: - run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date) + run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date) try: dag_run = trigger_dag( dag_id=self.trigger_dag_id, run_id=run_id, conf=self.conf, - execution_date=parsed_execution_date, + execution_date=parsed_logical_date, replace_microseconds=False, ) except DagRunAlreadyExists as e: if self.reset_dag_run: - self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date) + self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_logical_date) # Get target dag object and call clear() dag_model = DagModel.get_current(self.trigger_dag_id) @@ -182,7 +194,7 @@ def execute(self, context: Context): dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True) dag = dag_bag.get_dag(self.trigger_dag_id) dag_run = e.dag_run - dag.clear(start_date=dag_run.execution_date, end_date=dag_run.execution_date) + dag.clear(start_date=dag_run.logical_date, end_date=dag_run.logical_date) else: raise e if dag_run is None: @@ -190,7 +202,7 @@ def execute(self, context: Context): # Store the execution date from the dag run (either created or found above) to # be used when creating the extra link on the webserver. ti = context["task_instance"] - ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, value=dag_run.execution_date.isoformat()) + ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat()) ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id) if self.wait_for_completion: @@ -200,7 +212,7 @@ def execute(self, context: Context): trigger=DagStateTrigger( dag_id=self.trigger_dag_id, states=self.allowed_states + self.failed_states, - execution_dates=[parsed_execution_date], + execution_dates=[parsed_logical_date], poll_interval=self.poke_interval, ), method_name="execute_complete", @@ -210,7 +222,7 @@ def execute(self, context: Context): self.log.info( "Waiting for %s on %s to become allowed state %s ...", self.trigger_dag_id, - dag_run.execution_date, + dag_run.logical_date, self.allowed_states, ) time.sleep(self.poke_interval) @@ -225,17 +237,17 @@ def execute(self, context: Context): @provide_session def execute_complete(self, context: Context, session: Session, event: tuple[str, dict[str, Any]]): - # This execution date is parsed from the return trigger event - provided_execution_date = event[1]["execution_dates"][0] + # This logical_date is parsed from the return trigger event + provided_logical_date = event[1]["execution_dates"][0] try: dag_run = session.execute( select(DagRun).where( - DagRun.dag_id == self.trigger_dag_id, DagRun.execution_date == provided_execution_date + DagRun.dag_id == self.trigger_dag_id, DagRun.execution_date == provided_logical_date ) ).scalar_one() except NoResultFound: raise AirflowException( - f"No DAG run found for DAG {self.trigger_dag_id} and execution date {self.execution_date}" + f"No DAG run found for DAG {self.trigger_dag_id} and logical date {self.logical_date}" ) state = dag_run.state diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index a90f49926e5ff7..9eed9b786ea9fe 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -110,7 +110,7 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session): args, _ = mock_build_url.call_args expected_args = { "dag_id": triggered_dag_run.dag_id, - "base_date": triggered_dag_run.execution_date.isoformat(), + "base_date": triggered_dag_run.logical_date.isoformat(), } assert expected_args in args @@ -122,7 +122,7 @@ def test_trigger_dagrun(self): with create_session() as session: dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() assert dagrun.external_trigger - assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.execution_date) + assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date) self.assert_extra_link(dagrun, task, session) def test_trigger_dagrun_custom_run_id(self): @@ -139,13 +139,13 @@ def test_trigger_dagrun_custom_run_id(self): assert len(dagruns) == 1 assert dagruns[0].run_id == "custom_run_id" - def test_trigger_dagrun_with_execution_date(self): - """Test TriggerDagRunOperator with custom execution_date.""" - custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5) + def test_trigger_dagrun_with_logical_date(self): + """Test TriggerDagRunOperator with custom logical_date.""" + custom_logical_date = timezone.datetime(2021, 1, 2, 3, 4, 5) task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_execution_date", + task_id="test_trigger_dagrun_with_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=custom_execution_date, + logical_date=custom_logical_date, dag=self.dag, ) task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @@ -153,17 +153,17 @@ def test_trigger_dagrun_with_execution_date(self): with create_session() as session: dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() assert dagrun.external_trigger - assert dagrun.execution_date == custom_execution_date - assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date) + assert dagrun.logical_date == custom_logical_date + assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_logical_date) self.assert_extra_link(dagrun, task, session) def test_trigger_dagrun_twice(self): - """Test TriggerDagRunOperator with custom execution_date.""" + """Test TriggerDagRunOperator with custom logical_date.""" utc_now = timezone.utcnow() task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_execution_date", + task_id="test_trigger_dagrun_with_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=utc_now, + logical_date=utc_now, dag=self.dag, poke_interval=1, reset_dag_run=True, @@ -186,16 +186,16 @@ def test_trigger_dagrun_twice(self): assert len(dagruns) == 1 triggered_dag_run = dagruns[0] assert triggered_dag_run.external_trigger - assert triggered_dag_run.execution_date == utc_now + assert triggered_dag_run.logical_date == utc_now self.assert_extra_link(triggered_dag_run, task, session) def test_trigger_dagrun_with_scheduled_dag_run(self): - """Test TriggerDagRunOperator with custom execution_date and scheduled dag_run.""" + """Test TriggerDagRunOperator with custom logical_date and scheduled dag_run.""" utc_now = timezone.utcnow() task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_execution_date", + task_id="test_trigger_dagrun_with_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=utc_now, + logical_date=utc_now, dag=self.dag, poke_interval=1, reset_dag_run=True, @@ -218,15 +218,15 @@ def test_trigger_dagrun_with_scheduled_dag_run(self): assert len(dagruns) == 1 triggered_dag_run = dagruns[0] assert triggered_dag_run.external_trigger - assert triggered_dag_run.execution_date == utc_now + assert triggered_dag_run.logical_date == utc_now self.assert_extra_link(triggered_dag_run, task, session) - def test_trigger_dagrun_with_templated_execution_date(self): - """Test TriggerDagRunOperator with templated execution_date.""" + def test_trigger_dagrun_with_templated_logical_date(self): + """Test TriggerDagRunOperator with templated logical_date.""" task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_str_execution_date", + task_id="test_trigger_dagrun_with_str_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date="{{ logical_date }}", + logical_date="{{ logical_date }}", dag=self.dag, ) task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) @@ -236,13 +236,13 @@ def test_trigger_dagrun_with_templated_execution_date(self): assert len(dagruns) == 1 triggered_dag_run = dagruns[0] assert triggered_dag_run.external_trigger - assert triggered_dag_run.execution_date == DEFAULT_DATE + assert triggered_dag_run.logical_date == DEFAULT_DATE self.assert_extra_link(triggered_dag_run, task, session) def test_trigger_dagrun_operator_conf(self): """Test passing conf to the triggered DagRun.""" task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_str_execution_date", + task_id="test_trigger_dagrun_with_str_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, conf={"foo": "bar"}, dag=self.dag, @@ -268,7 +268,7 @@ def test_trigger_dagrun_operator_templated_invalid_conf(self): def test_trigger_dagrun_operator_templated_conf(self): """Test passing a templated conf to the triggered DagRun.""" task = TriggerDagRunOperator( - task_id="test_trigger_dagrun_with_str_execution_date", + task_id="test_trigger_dagrun_with_str_logical_date", trigger_dag_id=TRIGGERED_DAG_ID, conf={"foo": "{{ dag.dag_id }}"}, dag=self.dag, @@ -282,48 +282,48 @@ def test_trigger_dagrun_operator_templated_conf(self): def test_trigger_dagrun_with_reset_dag_run_false(self): """Test TriggerDagRunOperator without reset_dag_run.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, trigger_run_id=None, - execution_date=None, + logical_date=None, reset_dag_run=False, dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() assert len(dagruns) == 2 @pytest.mark.parametrize( - "trigger_run_id, trigger_execution_date", + "trigger_run_id, trigger_logical_date", [ (None, DEFAULT_DATE), ("dummy_run_id", None), ("dummy_run_id", DEFAULT_DATE), ], ) - def test_trigger_dagrun_with_reset_dag_run_false_fail(self, trigger_run_id, trigger_execution_date): + def test_trigger_dagrun_with_reset_dag_run_false_fail(self, trigger_run_id, trigger_logical_date): """Test TriggerDagRunOperator without reset_dag_run but triggered dag fails.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, trigger_run_id=trigger_run_id, - execution_date=trigger_execution_date, + logical_date=trigger_logical_date, reset_dag_run=False, dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) with pytest.raises(DagRunAlreadyExists): - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) @pytest.mark.parametrize( - "trigger_run_id, trigger_execution_date, expected_dagruns_count", + "trigger_run_id, trigger_logical_date, expected_dagruns_count", [ (None, DEFAULT_DATE, 1), (None, None, 2), @@ -332,20 +332,20 @@ def test_trigger_dagrun_with_reset_dag_run_false_fail(self, trigger_run_id, trig ], ) def test_trigger_dagrun_with_reset_dag_run_true( - self, trigger_run_id, trigger_execution_date, expected_dagruns_count + self, trigger_run_id, trigger_logical_date, expected_dagruns_count ): """Test TriggerDagRunOperator with reset_dag_run.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, trigger_run_id=trigger_run_id, - execution_date=trigger_execution_date, + logical_date=trigger_logical_date, reset_dag_run=True, dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) - task.run(start_date=execution_date, end_date=execution_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) + task.run(start_date=logical_date, end_date=logical_date, ignore_ti_state=True) with create_session() as session: dag_runs = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -354,17 +354,17 @@ def test_trigger_dagrun_with_reset_dag_run_true( def test_trigger_dagrun_with_wait_for_completion_true(self): """Test TriggerDagRunOperator with wait_for_completion.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, allowed_states=[State.QUEUED], dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -372,28 +372,28 @@ def test_trigger_dagrun_with_wait_for_completion_true(self): def test_trigger_dagrun_with_wait_for_completion_true_fail(self): """Test TriggerDagRunOperator with wait_for_completion but triggered dag fails.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, failed_states=[State.QUEUED], dag=self.dag, ) with pytest.raises(AirflowException): - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) def test_trigger_dagrun_triggering_itself(self): """Test TriggerDagRunOperator that triggers itself""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=self.dag.dag_id, dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = ( @@ -407,33 +407,33 @@ def test_trigger_dagrun_triggering_itself(self): assert triggered_dag_run.state == State.QUEUED self.assert_extra_link(triggered_dag_run, task, session) - def test_trigger_dagrun_triggering_itself_with_execution_date(self): - """Test TriggerDagRunOperator that triggers itself with execution date, + def test_trigger_dagrun_triggering_itself_with_logical_date(self): + """Test TriggerDagRunOperator that triggers itself with logical date, fails with DagRunAlreadyExists""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=self.dag.dag_id, - execution_date=execution_date, + logical_date=logical_date, dag=self.dag, ) with pytest.raises(DagRunAlreadyExists): - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self): """Test TriggerDagRunOperator with wait_for_completion.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, allowed_states=[State.QUEUED], deferrable=False, dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -441,11 +441,11 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_false(self): def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self): """Test TriggerDagRunOperator with wait_for_completion.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, allowed_states=[State.QUEUED], @@ -453,7 +453,7 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self): dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -469,11 +469,11 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true(self): def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self): """Test TriggerDagRunOperator wait_for_completion dag run in non defined state.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, allowed_states=[State.SUCCESS], @@ -481,7 +481,7 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self): dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -501,11 +501,11 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure(self): def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self): """Test TriggerDagRunOperator wait_for_completion dag run in failed state.""" - execution_date = DEFAULT_DATE + logical_date = DEFAULT_DATE task = TriggerDagRunOperator( task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, - execution_date=execution_date, + logical_date=logical_date, wait_for_completion=True, poke_interval=10, allowed_states=[State.SUCCESS], @@ -514,7 +514,7 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self) dag=self.dag, ) - task.run(start_date=execution_date, end_date=execution_date) + task.run(start_date=logical_date, end_date=logical_date) with create_session() as session: dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() @@ -529,3 +529,21 @@ def test_trigger_dagrun_with_wait_for_completion_true_defer_true_failure_2(self) with pytest.raises(AirflowException, match="failed with failed state"): task.execute_complete(context={}, event=trigger.serialize()) + + def test_trigger_dagrun_with_execution_date(self): + """Test TriggerDagRunOperator with custom execution_date (deprecated parameter)""" + custom_execution_date = timezone.datetime(2021, 1, 2, 3, 4, 5) + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + execution_date=custom_execution_date, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagrun = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).one() + assert dagrun.external_trigger + assert dagrun.logical_date == custom_execution_date + assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, custom_execution_date) + self.assert_extra_link(dagrun, task, session) From c71e219fcd76bb65ad28b7df1168064f31614f4c Mon Sep 17 00:00:00 2001 From: flolas Date: Sat, 27 Apr 2024 02:04:51 -0400 Subject: [PATCH 2/2] fix comment --- airflow/operators/trigger_dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index 81c7863770b5fd..f8cfa5256a57bd 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -78,7 +78,7 @@ class TriggerDagRunOperator(BaseOperator): :param trigger_run_id: The run ID to use for the triggered DAG run (templated). If not provided, a run ID will be automatically generated. :param conf: Configuration for the DAG run (templated). - :param logical_date: Logical for the dag (templated). + :param logical_date: Logical date for the dag (templated). :param reset_dag_run: Whether clear existing dag run if already exists. This is useful when backfill or rerun an existing dag run. This only resets (not recreates) the dag run.