Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Refactored __new__ magic method of BaseOperatorMeta to avoid bad mixing classic and decorated operators #37937

Merged
merged 96 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
ba4d6a7
refactor: Refactored __new__ magic method of BaseOperatorMeta class t…
davidblain-infrabel Mar 6, 2024
78408f4
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 6, 2024
7856c57
refactor: Get unit_test_mode from airflow config instead of directly …
davidblain-infrabel Mar 6, 2024
6dfdc07
fix: Fixed import of AirflowException
davidblain-infrabel Mar 6, 2024
20f3a06
refactor: Use conf_vars instead of patch.dict to alter Airflow unit_t…
davidblain-infrabel Mar 6, 2024
9f2935c
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 6, 2024
93d72f0
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 6, 2024
5a4f705
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 7, 2024
5ef3a2a
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 7, 2024
b72cfe8
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 7, 2024
657c54a
fix: Fixed match on expected exception message
davidblain-infrabel Mar 6, 2024
b230f7f
refactor: Added test case where HelloWorldOperator is called from wit…
davidblain-infrabel Mar 7, 2024
1783b89
refactor: Refactored TestBaseOperatorMeta by having a dedicated fixtu…
davidblain-infrabel Mar 7, 2024
2baa91e
refactor: Reformatted some files that where failing due to static checks
davidblain-infrabel Mar 7, 2024
413efde
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 8, 2024
55c3038
refactor: Try disabling unit_test_mode right just before calling the …
davidblain-infrabel Mar 8, 2024
abebb14
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 8, 2024
e2941a4
refactor: Re-ordered imports as asked by static checks
davidblain-infrabel Mar 8, 2024
00ede06
refactor: Added license at top of new test module
davidblain-infrabel Mar 8, 2024
1d958ff
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 8, 2024
345817e
refactor: Refactored ExecutorSafeguard decorator as a class so we can…
davidblain-infrabel Mar 11, 2024
43842e4
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 11, 2024
d33730c
refactor: Removed patch of sqlalchemy Session on test methods
davidblain-infrabel Mar 11, 2024
fabea74
refactor: Refactored ExecutorSafeguard decorator as a class so we can…
davidblain-infrabel Mar 11, 2024
bd33afd
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 11, 2024
80ddf2d
Revert "refactor: Refactored ExecutorSafeguard decorator as a class s…
davidblain-infrabel Mar 11, 2024
b07dfa5
refactor: Refactored ExecutorSafeguard decorator as a class so we can…
davidblain-infrabel Mar 11, 2024
337b08f
refactor: Added docstring to ExecutorSafeguard decorator
davidblain-infrabel Mar 11, 2024
5c3238d
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 11, 2024
39f42e5
docs: Reformatted docstring of ExecutorSafeguard
davidblain-infrabel Mar 11, 2024
ea8a001
refactor: Added missing white line between docstring and test_mode cl…
davidblain-infrabel Mar 11, 2024
9e48aaa
refactor: Fixed unit tests for ExecutorSafeguard
davidblain-infrabel Mar 12, 2024
83105ef
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 12, 2024
43c8c31
refactor: Reformatted baseoperator and test_baseoperatormeta file as …
davidblain-infrabel Mar 13, 2024
6e0a320
fix: Fixed import of partial function from functools
davidblain-infrabel Mar 13, 2024
f8f5450
refactor: Refactored multiple patches into one context manager so it'…
davidblain-infrabel Mar 13, 2024
5b396f1
refactor: Reformatted patch statement in TestExecutorSafeguard
davidblain-infrabel Mar 13, 2024
887c22d
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 13, 2024
b717afd
refactor: Added allow_mixing attribute to BaseOperator and added test…
davidblain-infrabel Mar 13, 2024
f0f5a11
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 13, 2024
ca0957d
refactor: Fixed static checks on baseoperator
davidblain-infrabel Mar 14, 2024
4174fa7
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 14, 2024
21d5fdf
fix: Forgot to also add allow_mixin parameter to partial method in Ba…
davidblain-infrabel Mar 14, 2024
b0d034e
refactor: Trying to fix example in docstring of allow_mixin param in …
davidblain-infrabel Mar 14, 2024
94692bc
refactor: Added bool type to allow_mixing attribute of BaseOperator
davidblain-infrabel Mar 14, 2024
069ffb2
refactor: Added allow_mixin parameter
davidblain-infrabel Mar 14, 2024
3c0aeb6
refactor: Added init file in resources package
davidblain-infrabel Mar 14, 2024
ff8395e
refactor: Changed docstring of BaseOperator to raw string due to back…
davidblain-infrabel Mar 14, 2024
54977bf
fix: Fixed name of allow_mixin parameter
davidblain-infrabel Mar 14, 2024
70a0534
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 14, 2024
ecfbe61
fix: Fixed check on allow_mixin property
davidblain-infrabel Mar 14, 2024
3ded4e7
fix: Fixed allow_mixin property in test
davidblain-infrabel Mar 14, 2024
ac1347e
refactor: Added allow_mixin to schema.json and assertion in test_dag_…
davidblain-infrabel Mar 14, 2024
c38d9d1
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 14, 2024
a619e27
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 14, 2024
2870dd0
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
1b36c8f
refactor: Refactored tests using dag_maker fixture and running them a…
davidblain-infrabel Mar 15, 2024
570e035
refactor: Moved import of Context under type checking
davidblain-infrabel Mar 15, 2024
6894413
refactor: Reformatted TestExecutorSafeguard
davidblain-infrabel Mar 15, 2024
ca3f0c5
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
a278162
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
959c7b2
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
c55de69
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
e139d6d
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
bdeadee
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 15, 2024
a0a3f76
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 17, 2024
9bf315c
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 17, 2024
092657a
refactor: Simplified check on test_mode in wrapper
davidblain-infrabel Mar 17, 2024
1abdf43
refactor: Improved check as a classic operator can also be called thr…
davidblain-infrabel Mar 18, 2024
b4a2a77
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 18, 2024
3163a0f
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 18, 2024
9f99e21
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 18, 2024
d6024ab
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 18, 2024
9b60ed0
refactor: Fixed additional static checks
davidblain-infrabel Mar 19, 2024
b2a2a0f
refactor: Fixed additional static checks
davidblain-infrabel Mar 20, 2024
b34026e
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 20, 2024
8e6a26a
refactor: Use sentinel instead of traceback to detect if execute was …
davidblain-infrabel Mar 21, 2024
8a35ebb
refactor: Use test_mode from TaskInstance
davidblain-infrabel Mar 21, 2024
d310473
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
b7d855d
refactor: Removed unused imports
davidblain-infrabel Mar 21, 2024
5695017
refactor: Put message in one line
davidblain-infrabel Mar 21, 2024
558e1d9
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
acafa62
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
8675919
Revert "refactor: Use test_mode from TaskInstance"
davidblain-infrabel Mar 21, 2024
7955870
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
83237e5
refactor: Put message on one line
davidblain-infrabel Mar 21, 2024
21b7512
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
3dcd775
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 21, 2024
e4b71ef
refactor: Fixed passing of sentinel arg when resume_execution is bein…
davidblain-infrabel Mar 22, 2024
70949f4
refactor: Reformatted test
davidblain-infrabel Mar 22, 2024
a7a6a7e
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 22, 2024
b2eb502
refactor: Check if next_kwargs is not None
davidblain-infrabel Mar 22, 2024
c02c61f
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 22, 2024
4f8ee87
refactor: Fixed DAG example in docstring of chain_linear method
davidblain-infrabel Mar 22, 2024
53d0a51
refactor: Renamed allow_mixin parameter of BaseOperator to allow_nest…
davidblain-infrabel Mar 22, 2024
b7a685c
Merge branch 'main' into feature/mixing-decorated-classic-operators
dabla Mar 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def _get_schema():

metadata = MetaData(schema=_get_schema(), naming_convention=naming_convention)
mapper_registry = registry(metadata=metadata)
_sentinel = object()

Base: Any = mapper_registry.generate_base()

Expand Down
61 changes: 58 additions & 3 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import sys
import warnings
from datetime import datetime, timedelta
from functools import total_ordering, wraps
from inspect import signature
from types import FunctionType
from typing import (
Expand Down Expand Up @@ -75,6 +76,7 @@
DEFAULT_WEIGHT_RULE,
AbstractOperator,
)
from airflow.models.base import _sentinel
from airflow.models.mappedoperator import OperatorPartial, validate_mapping_kwargs
from airflow.models.param import ParamsDict
from airflow.models.pool import Pool
Expand Down Expand Up @@ -215,6 +217,7 @@ def partial(**kwargs):
"weight_rule": DEFAULT_WEIGHT_RULE,
"inlets": [],
"outlets": [],
"allow_nested_operators": True,
}


Expand Down Expand Up @@ -265,6 +268,7 @@ def partial(
doc_yaml: str | None | ArgNotSet = NOTSET,
doc_rst: str | None | ArgNotSet = NOTSET,
logger_name: str | None | ArgNotSet = NOTSET,
allow_nested_operators: bool = True,
**kwargs,
) -> OperatorPartial:
from airflow.models.dag import DagContext
Expand Down Expand Up @@ -331,6 +335,7 @@ def partial(
"doc_rst": doc_rst,
"doc_yaml": doc_yaml,
"logger_name": logger_name,
"allow_nested_operators": allow_nested_operators,
}

# Inject DAG-level default args into args provided to this function.
Expand Down Expand Up @@ -365,6 +370,35 @@ def partial(
)


class ExecutorSafeguard:
"""
The ExecutorSafeguard decorator.

Checks if the execute method of an operator isn't manually called outside
the TaskInstance as we want to avoid bad mixing between decorated and
classic operators.
"""

test_mode = conf.getboolean("core", "unit_test_mode")

@classmethod
def decorator(cls, func):
@wraps(func)
def wrapper(self, *args, **kwargs):
from airflow.decorators.base import DecoratedOperator

sentinel = kwargs.pop(f"{self.__class__.__name__}__sentinel", None)

if not cls.test_mode and not sentinel == _sentinel and not isinstance(self, DecoratedOperator):
message = f"{self.__class__.__name__}.{func.__name__} cannot be called outside TaskInstance!"
if not self.allow_nested_operators:
raise AirflowException(message)
self.log.warning(message)
return func(self, *args, **kwargs)

return wrapper


class BaseOperatorMeta(abc.ABCMeta):
"""Metaclass of BaseOperator."""

Expand Down Expand Up @@ -396,7 +430,7 @@ def _apply_defaults(cls, func: T) -> T:

fixup_decorator_warning_stack(func)

@functools.wraps(func)
@wraps(func)
def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any:
from airflow.models.dag import DagContext
from airflow.utils.task_group import TaskGroupContext
Expand Down Expand Up @@ -464,6 +498,9 @@ def apply_defaults(self: BaseOperator, *args: Any, **kwargs: Any) -> Any:
return cast(T, apply_defaults)

def __new__(cls, name, bases, namespace, **kwargs):
execute_method = namespace.get("execute")
if callable(execute_method) and not getattr(execute_method, "__isabstractmethod__", False):
namespace["execute"] = ExecutorSafeguard().decorator(execute_method)
new_cls = super().__new__(cls, name, bases, namespace, **kwargs)
with contextlib.suppress(KeyError):
# Update the partial descriptor with the class method, so it calls the actual function
Expand All @@ -475,9 +512,9 @@ def __new__(cls, name, bases, namespace, **kwargs):
return new_cls


@functools.total_ordering
@total_ordering
class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta):
"""
r"""
Abstract base class for all operators.

Since operators create objects that become nodes in the DAG, BaseOperator
Expand Down Expand Up @@ -672,6 +709,21 @@ class derived from this one results in the creation of a task object,
If set to `None` (default), the logger name will fall back to
`airflow.task.operators.{class.__module__}.{class.__name__}` (e.g. SimpleHttpOperator will have
*airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator* as logger).
:param allow_nested_operators: if True, when an operator is executed within another one a warning message
will be logged. If False, then an exception will be raised if the operator is badly used (e.g. nested
within another one). In future releases of Airflow this parameter will be removed and an exception
will always be thrown when operators are nested within each other (default is True).

**Example**: example of a bad operator mixin usage::

@task(provide_context=True)
def say_hello_world(**context):
hello_world_task = BashOperator(
task_id="hello_world_task",
bash_command="python -c \"print('Hello, world!')\"",
dag=dag,
)
hello_world_task.execute(context)
"""

# Implementing Operator.
Expand Down Expand Up @@ -727,6 +779,7 @@ class derived from this one results in the creation of a task object,
"on_skipped_callback",
"do_xcom_push",
"multiple_outputs",
"allow_nested_operators",
}

# Defines if the operator supports lineage without manual definitions
Expand Down Expand Up @@ -807,6 +860,7 @@ def __init__(
doc_yaml: str | None = None,
doc_rst: str | None = None,
logger_name: str | None = None,
allow_nested_operators: bool = True,
**kwargs,
):
from airflow.models.dag import DagContext
Expand Down Expand Up @@ -956,6 +1010,7 @@ def __init__(

self._log_config_logger_name = "airflow.task.operators"
self._logger_name = logger_name
self.allow_nested_operators: bool = allow_nested_operators

# Lineage
self.inlets: list = []
Expand Down
4 changes: 4 additions & 0 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ def doc_yaml(self) -> str | None:
def doc_rst(self) -> str | None:
return self.partial_kwargs.get("doc_rst")

@property
def allow_nested_operators(self) -> bool:
return bool(self.partial_kwargs.get("allow_nested_operators"))

def get_dag(self) -> DAG | None:
"""Implement Operator."""
return self.dag
Expand Down
8 changes: 7 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
XComForMappingNotPushed,
)
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import Base, StringID, TaskInstanceDependencies
from airflow.models.base import Base, StringID, TaskInstanceDependencies, _sentinel
from airflow.models.dagbag import DagBag
from airflow.models.log import Log
from airflow.models.mappedoperator import MappedOperator
Expand Down Expand Up @@ -411,11 +411,17 @@ def _execute_task(task_instance: TaskInstance | TaskInstancePydantic, context: C
execute_callable_kwargs: dict[str, Any] = {}
execute_callable: Callable
if task_instance.next_method:
if task_instance.next_method == "execute":
if not task_instance.next_kwargs:
task_instance.next_kwargs = {}
task_instance.next_kwargs[f"{task_to_execute.__class__.__name__}__sentinel"] = _sentinel
execute_callable = task_to_execute.resume_execution
execute_callable_kwargs["next_method"] = task_instance.next_method
execute_callable_kwargs["next_kwargs"] = task_instance.next_kwargs
else:
execute_callable = task_to_execute.execute
if execute_callable.__name__ == "execute":
execute_callable_kwargs[f"{task_to_execute.__class__.__name__}__sentinel"] = _sentinel

def _execute_callable(context, **execute_callable_kwargs):
try:
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/microsoft/azure/serialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
3 changes: 2 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@
"_is_mapped": { "const": true, "$comment": "only present when True" },
"expand_input": { "type": "object" },
"partial_kwargs": { "type": "object" },
"map_index_template": { "type": "string" }
"map_index_template": { "type": "string" },
"allow_nested_operators": { "type": "boolean" }
},
"dependencies": {
"expand_input": ["partial_kwargs", "_is_mapped"],
Expand Down
Loading
Loading