Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename TaskMixin to DependencyMixin #20297

Merged
merged 4 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
from airflow.models.param import ParamsDict
from airflow.models.pool import Pool
from airflow.models.taskinstance import Context, TaskInstance, clear_task_instances
from airflow.models.taskmixin import TaskMixin
from airflow.models.taskmixin import DependencyMixin
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
Expand All @@ -80,7 +80,6 @@

if TYPE_CHECKING:
from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.utils.task_group import TaskGroup

ScheduleInterval = Union[str, timedelta, relativedelta]
Expand Down Expand Up @@ -206,7 +205,7 @@ def __new__(cls, name, bases, namespace, **kwargs):


@functools.total_ordering
class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta):
class BaseOperator(Operator, LoggingMixin, DependencyMixin, metaclass=BaseOperatorMeta):
"""
Abstract base class for all operators. Since operators create objects that
become nodes in the dag, BaseOperator contains many recursive methods for
Expand Down Expand Up @@ -1412,7 +1411,7 @@ def leaves(self) -> List["BaseOperator"]:

def _set_relatives(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
upstream: bool = False,
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
Expand All @@ -1424,13 +1423,12 @@ def _set_relatives(
for task_object in task_or_task_list:
task_object.update_relative(self, not upstream)
relatives = task_object.leaves if upstream else task_object.roots
task_list.extend(relatives)

for task in task_list:
if not isinstance(task, BaseOperator):
raise AirflowException(
f"Relationships can only be set between Operators; received {task.__class__.__name__}"
)
for task in relatives:
if not isinstance(task, BaseOperator):
raise AirflowException(
f"Relationships can only be set between Operators; received {task.__class__.__name__}"
)
task_list.append(task)

# relationships can only be set if the tasks share a single DAG. Tasks
# without a DAG are assigned to that DAG.
Expand Down Expand Up @@ -1475,7 +1473,7 @@ def _set_relatives(

def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Expand All @@ -1486,7 +1484,7 @@ def set_downstream(

def set_upstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
edge_modifier: Optional[EdgeModifier] = None,
) -> None:
"""
Expand Down Expand Up @@ -1662,10 +1660,11 @@ def defer(
raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)


Chainable = Union[BaseOperator, "XComArg", EdgeModifier, "TaskGroup"]
# TODO: Deprecate for Airflow 3.0
Chainable = Union[DependencyMixin, Sequence[DependencyMixin]]
Comment on lines +1663 to +1664
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it’s actually a good idea to keep using this name…? It’s unfortunately not super easy to deprecate a type alias.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this alias is used anywhere beyond the usage a few lines below.



def chain(*tasks: Union[Chainable, Sequence[Chainable]]) -> None:
def chain(*tasks: Union[DependencyMixin, Sequence[DependencyMixin]]) -> None:
r"""
Given a number of tasks, builds a dependency chain.

Expand Down Expand Up @@ -1776,17 +1775,12 @@ def chain(*tasks: Union[Chainable, Sequence[Chainable]]) -> None:
List[airflow.utils.EdgeModifier], airflow.utils.EdgeModifier, List[airflow.models.XComArg], XComArg,
List[airflow.utils.TaskGroup], or airflow.utils.TaskGroup
"""
from airflow.models.xcom_arg import XComArg
from airflow.utils.task_group import TaskGroup

chainable_types = (BaseOperator, XComArg, EdgeModifier, TaskGroup)

for index, up_task in enumerate(tasks[:-1]):
down_task = tasks[index + 1]
if isinstance(up_task, chainable_types):
if isinstance(up_task, DependencyMixin):
up_task.set_downstream(down_task)
continue
if isinstance(down_task, chainable_types):
if isinstance(down_task, DependencyMixin):
down_task.set_upstream(up_task)
continue
if not isinstance(up_task, Sequence) or not isinstance(down_task, Sequence):
Expand All @@ -1803,8 +1797,8 @@ def chain(*tasks: Union[Chainable, Sequence[Chainable]]) -> None:


def cross_downstream(
from_tasks: Sequence[Union[BaseOperator, "XComArg"]],
to_tasks: Union[BaseOperator, "XComArg", Sequence[Union[BaseOperator, "XComArg"]]],
from_tasks: Sequence[DependencyMixin],
to_tasks: Union[DependencyMixin, Sequence[DependencyMixin]],
):
r"""
Set downstream dependencies for all tasks in from_tasks to all tasks in to_tasks.
Expand Down
53 changes: 34 additions & 19 deletions airflow/models/taskmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,76 @@
# specific language governing permissions and limitations
ashb marked this conversation as resolved.
Show resolved Hide resolved
dstandish marked this conversation as resolved.
Show resolved Hide resolved
# under the License.

import warnings
from abc import abstractmethod
from typing import Sequence, Union


class TaskMixin:
"""
Mixing implementing common chain methods like >> and <<.

In the following functions we use:
Task = Union[BaseOperator, XComArg]
No type annotations due to cyclic imports.
"""
class DependencyMixin:
"""Mixing implementing common dependency setting methods methods like >> and <<."""

@property
def roots(self):
"""Should return list of root operator List[BaseOperator]"""
def roots(self) -> Sequence["DependencyMixin"]:
"""
List of root nodes -- ones with no upstream dependencies.

a.k.a. the "start" of this sub-graph
"""
raise NotImplementedError()

@property
def leaves(self):
"""Should return list of leaf operator List[BaseOperator]"""
def leaves(self) -> Sequence["DependencyMixin"]:
"""
List of leaf nodes -- ones with only upstream dependencies.

a.k.a. the "end" of this sub-graph
"""
raise NotImplementedError()

@abstractmethod
def set_upstream(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def set_upstream(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Set a task or a task list to be directly upstream from the current task."""
raise NotImplementedError()

@abstractmethod
def set_downstream(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def set_downstream(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Set a task or a task list to be directly downstream from the current task."""
raise NotImplementedError()

def update_relative(self, other: "TaskMixin", upstream=True) -> None:
def update_relative(self, other: "DependencyMixin", upstream=True) -> None:
"""
Update relationship information about another TaskMixin. Default is no-op.
Override if necessary.
"""

def __lshift__(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def __lshift__(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Implements Task << Task"""
self.set_upstream(other)
return other

def __rshift__(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def __rshift__(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Implements Task >> Task"""
self.set_downstream(other)
return other

def __rrshift__(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def __rrshift__(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Called for Task >> [Task] because list don't have __rshift__ operators."""
self.__lshift__(other)
return self

def __rlshift__(self, other: Union["TaskMixin", Sequence["TaskMixin"]]):
def __rlshift__(self, other: Union["DependencyMixin", Sequence["DependencyMixin"]]):
"""Called for Task << [Task] because list don't have __lshift__ operators."""
self.__rshift__(other)
return self


class TaskMixin(DependencyMixin):
""":meta private:"""

def __init_subclass__(cls) -> None:
dstandish marked this conversation as resolved.
Show resolved Hide resolved
warnings.warn(
f"TaskMixin has been renamed to DependencyMixin, please update {cls.__name__}",
category=DeprecationWarning,
stacklevel=2,
)
return super().__init_subclass__()
8 changes: 4 additions & 4 deletions airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

from airflow.exceptions import AirflowException
from airflow.models.baseoperator import BaseOperator
from airflow.models.taskmixin import TaskMixin
from airflow.models.taskmixin import DependencyMixin
from airflow.models.xcom import XCOM_RETURN_KEY
from airflow.utils.edgemodifier import EdgeModifier

if TYPE_CHECKING:
from airflow.utils.context import Context


class XComArg(TaskMixin):
class XComArg(DependencyMixin):
"""
Class that represents a XCom push from a previous operator.
Defaults to "return_value" as only key.
Expand Down Expand Up @@ -116,15 +116,15 @@ def key(self) -> str:

def set_upstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
edge_modifier: Optional[EdgeModifier] = None,
):
"""Proxy to underlying operator set_upstream method. Required by TaskMixin."""
self.operator.set_upstream(task_or_task_list, edge_modifier)

def set_downstream(
self,
task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]],
edge_modifier: Optional[EdgeModifier] = None,
):
"""Proxy to underlying operator set_downstream method. Required by TaskMixin."""
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dot_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airflow.models import TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.taskmixin import TaskMixin
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.serialized_objects import DagDependency
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
Expand Down Expand Up @@ -111,7 +111,7 @@ def _draw_task_group(


def _draw_nodes(
node: TaskMixin, parent_graph: graphviz.Digraph, states_by_task_id: Optional[Dict[Any, Any]]
node: DependencyMixin, parent_graph: graphviz.Digraph, states_by_task_id: Dict[str, str]
) -> None:
"""Draw the node and its children on the given parent_graph recursively."""
if isinstance(node, BaseOperator):
Expand Down
52 changes: 32 additions & 20 deletions airflow/utils/edgemodifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, List, Optional, Sequence, Union

from typing import List, Optional, Sequence, Union
from airflow.models.taskmixin import DependencyMixin

from airflow.models.taskmixin import TaskMixin
if TYPE_CHECKING:
from airflow.models.baseoperator import BaseOperator


class EdgeModifier(TaskMixin):
class EdgeModifier(DependencyMixin):
"""
Class that represents edge information to be added between two
tasks/operators. Has shorthand factory functions, like Label("hooray").
Expand All @@ -39,36 +41,39 @@ class EdgeModifier(TaskMixin):
"""

def __init__(self, label: Optional[str] = None):
from airflow.models.baseoperator import BaseOperator

self.label = label
self._upstream: List[BaseOperator] = []
self._downstream: List[BaseOperator] = []
self._upstream: List["BaseOperator"] = []
self._downstream: List["BaseOperator"] = []

@property
def roots(self):
"""Should return list of root operator List["BaseOperator"]"""
return self._downstream

@property
def leaves(self):
"""Should return list of leaf operator List["BaseOperator"]"""
return self._upstream

def set_upstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], chain: bool = True):
def set_upstream(
self, task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]], chain: bool = True
):
"""
Sets the given task/list onto the upstream attribute, and then checks if
we have both sides so we can resolve the relationship.

Providing this also provides << via TaskMixin.
Providing this also provides << via DependencyMixin.
"""
from airflow.models.baseoperator import BaseOperator

# Ensure we have a list, even if it's just one item
if isinstance(task_or_task_list, TaskMixin):
if isinstance(task_or_task_list, DependencyMixin):
task_or_task_list = [task_or_task_list]
# Unfurl it into actual operators
operators = []
operators: List[BaseOperator] = []
for task in task_or_task_list:
operators.extend(task.roots)
for root in task.roots:
if not isinstance(root, BaseOperator):
raise TypeError(f"Cannot use edge labels with {type(root).__name__}, only operators")
operators.append(root)
# For each already-declared downstream, pair off with each new upstream
# item and store the edge info.
for operator in operators:
Expand All @@ -79,20 +84,27 @@ def set_upstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
# Add the new tasks to our list of ones we've seen
self._upstream.extend(operators)

def set_downstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]], chain: bool = True):
def set_downstream(
self, task_or_task_list: Union[DependencyMixin, Sequence[DependencyMixin]], chain: bool = True
):
"""
Sets the given task/list onto the downstream attribute, and then checks if
we have both sides so we can resolve the relationship.

Providing this also provides >> via TaskMixin.
Providing this also provides >> via DependencyMixin.
"""
from airflow.models.baseoperator import BaseOperator

# Ensure we have a list, even if it's just one item
if isinstance(task_or_task_list, TaskMixin):
if isinstance(task_or_task_list, DependencyMixin):
task_or_task_list = [task_or_task_list]
# Unfurl it into actual operators
operators = []
operators: List[BaseOperator] = []
for task in task_or_task_list:
operators.extend(task.leaves)
for leaf in task.leaves:
if not isinstance(leaf, BaseOperator):
raise TypeError(f"Cannot use edge labels with {type(leaf).__name__}, only operators")
operators.append(leaf)
# Pair them off with existing
for operator in operators:
for upstream in self._upstream:
Expand All @@ -102,7 +114,7 @@ def set_downstream(self, task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]
# Add the new tasks to our list of ones we've seen
self._downstream.extend(operators)

def update_relative(self, other: "TaskMixin", upstream: bool = True) -> None:
def update_relative(self, other: DependencyMixin, upstream: bool = True) -> None:
"""
Called if we're not the "main" side of a relationship; we still run the
same logic, though.
Expand Down
Loading