Skip to content

Commit

Permalink
Allowing DateTimeSensorAsync, FileSensor and TimeSensorAsync to start…
Browse files Browse the repository at this point in the history
… execution from trigger during dynamic task mapping (#41182)

* docs(deferring): reword the doc to enable dynamic task mapping support to start execution from the trigger feature
* feat(sensors): add trigger_kwargs argument to DateTimeSensorAsync, FileSensor and TimeSensorAsync
    * Add start execution from trigger during dynamic task mapping.
  • Loading branch information
Lee-W authored Aug 1, 2024
1 parent 9f75995 commit b28e8bf
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 58 deletions.
11 changes: 10 additions & 1 deletion airflow/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class DateTimeSensorAsync(DateTimeSensor):
:param target_time: datetime after which the job succeeds. (templated)
:param start_from_trigger: Start the task directly from the triggerer without going into the worker.
:param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True
during dynamic task mapping. This argument is not used in standard usage.
:param end_from_trigger: End the task directly from the triggerer without going into the worker.
"""

Expand All @@ -99,7 +101,14 @@ class DateTimeSensorAsync(DateTimeSensor):
)
start_from_trigger = False

def __init__(self, *, start_from_trigger: bool = False, end_from_trigger: bool = False, **kwargs) -> None:
def __init__(
self,
*,
start_from_trigger: bool = False,
end_from_trigger: bool = False,
trigger_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.end_from_trigger = end_from_trigger

Expand Down
5 changes: 4 additions & 1 deletion airflow/sensors/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import os
from functools import cached_property
from glob import glob
from typing import TYPE_CHECKING, Sequence
from typing import TYPE_CHECKING, Any, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -50,6 +50,8 @@ class FileSensor(BaseSensorOperator):
:param deferrable: If waiting for completion, whether to defer the task until done,
default is ``False``.
:param start_from_trigger: Start the task directly from the triggerer without going into the worker.
:param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True
during dynamic task mapping. This argument is not used in standard usage.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
Expand Down Expand Up @@ -77,6 +79,7 @@ def __init__(
recursive=False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
start_from_trigger: bool = False,
trigger_kwargs: dict[str, Any] | None = None,
**kwargs,
):
super().__init__(**kwargs)
Expand Down
6 changes: 5 additions & 1 deletion airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class TimeSensorAsync(BaseSensorOperator):
This frees up a worker slot while it is waiting.
:param target_time: time after which the job succeeds
:param start_from_trigger: Start the task directly from the triggerer without going into the worker.
:param end_from_trigger: End the task directly from the triggerer without going into the worker.
:param trigger_kwargs: The keyword arguments passed to the trigger when start_from_trigger is set to True
during dynamic task mapping. This argument is not used in standard usage.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
Expand All @@ -76,9 +79,10 @@ class TimeSensorAsync(BaseSensorOperator):
def __init__(
self,
*,
target_time: datetime.time,
start_from_trigger: bool = False,
trigger_kwargs: dict[str, Any] | None = None,
end_from_trigger: bool = False,
target_time: datetime.time,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down
119 changes: 64 additions & 55 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,59 +97,6 @@ When writing a deferrable operators these are the main points to consider:
return
Exiting deferred task from Triggers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 2.10.0

If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker.

Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task.

.. code-block:: python
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events:

.. code-block:: python
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.

Writing Triggers
~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -345,7 +292,11 @@ In the sensor part, we'll need to provide the path to ``HourDeltaTrigger`` as ``
# We have no more work to do here. Mark as complete.
return
To enable Dynamic Task Mapping support, you can define ``start_from_trigger`` and ``trigger_kwargs`` in the parameter of "__init__". **Note that you don't need to define both of them to use this feature, but you do need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not work. Also, this works different from mapping an operator without ``start_from_trigger`` support. The whole ``__init__`` method will be skipped when mapping an operator whose ``start_from_trigger`` is set to True. Only argument ``trigger_kwargs`` is used and passed into ``trigger_cls``.
The initialization stage of mapped tasks occurs after the scheduler submits them to the executor. Thus, this feature offers limited dynamic task mapping support and its usage differs from standard practices. To enable dynamic task mapping support, you need to define ``start_from_trigger`` and ``trigger_kwargs`` in the ``__init__`` method. **Note that you don't need to define both of them to use this feature, but you need to use the exact same parameter name.** For example, if you define an argument as ``t_kwargs`` and assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not have any effect. The entire ``__init__`` method will be skipped when mapping a task whose ``start_from_trigger`` is set to True. The scheduler will use the provided ``start_from_trigger`` and ``trigger_kwargs`` from ``partial`` and ``expand`` (fallbacks to the ones from class attributes if not provided) to determine whether and how to submit tasks to the executor or the triggerer. Note that XCom values won't be resolved at this stage.
After the trigger has finished executing, the task may be sent back to the worker to execute the ``next_method``, or the task instance may end directly. (Refer to :ref:`Exiting deferred task from Triggers<deferring/exiting_from_trigger>`) If the task is sent back to the worker, the arguments in the ``__init__`` method will still take effect before the ``next_method`` is executed, but they will not affect the execution of the trigger.


.. code-block:: python
Expand Down Expand Up @@ -384,14 +335,72 @@ To enable Dynamic Task Mapping support, you can define ``start_from_trigger`` an
# We have no more work to do here. Mark as complete.
return
These parameters can be mapped using the ``expand`` and ``partial`` methods. Note that XCom values won't be resolved at this stage.
This will be expanded into 2 tasks, with their "hours" arguments set to 1 and 2 respectively.

.. code-block:: python
WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)
.. _deferring/exiting_from_trigger:

Exiting deferred task from Triggers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 2.10.0

If you want to exit your task directly from the triggerer without going into the worker, you can specify the instance level attribute ``end_from_trigger`` with the attributes of your deferrable operator, as discussed above. This can save some resources needed to start a new worker.

Triggers can have two options: they can either send execution back to the worker or end the task instance directly. If the trigger ends the task instance itself, the ``method_name`` does not matter and can be ``None``. Otherwise, provide ``method_name`` that should be used when resuming execution in the task.

.. code-block:: python
class WaitFiveHourSensorAsync(BaseSensorOperator):
# this sensor always exits from trigger.
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
)
``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be used to end the task instance directly. This marks the task with the state ``task_instance_state`` and optionally pushes xcom if applicable. Here's an example of how to use these events:

.. code-block:: python
class WaitFiveHourTrigger(BaseTrigger):
def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
super().__init__()
self.duration = duration
self.end_from_trigger = end_from_trigger
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"your_module.WaitFiveHourTrigger",
{"duration": self.duration, "end_from_trigger": self.end_from_trigger},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
await asyncio.sleep(self.duration.total_seconds())
if self.end_from_trigger:
yield TaskSuccessEvent()
else:
yield TriggerEvent({"duration": self.duration})
In the above example, the trigger will end the task instance directly if ``end_from_trigger`` is set to ``True`` by yielding ``TaskSuccessEvent``. Otherwise, it will resume the task instance with the method specified in the operator.

.. note::
Exiting from the trigger works only when listeners are not integrated for the deferrable operator. Currently, when deferrable operator has the ``end_from_trigger`` attribute set to ``True`` and listeners are integrated it raises an exception during parsing to indicate this limitation. While writing the custom trigger, ensure that the trigger is not set to end the task instance directly if the listeners are added from plugins. If the ``end_from_trigger`` attribute is changed to different attribute by author of trigger, the DAG parsing would not raise any exception and the listeners dependent on this task would not work. This limitation will be addressed in future releases.


High Availability
-----------------

Expand Down

0 comments on commit b28e8bf

Please sign in to comment.