-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add PythonVirtualenvDecorator to Taskflow API #14761
Conversation
The Workflow run is cancelling this PR. Building image for the PR has been cancelled |
4d2a578
to
d4ff3f9
Compare
packages and system libraries of the Airflow worker. | ||
|
||
To use a docker image with the Taskflow API, change the decorator to ``@task.docker`` | ||
and add any needed arguments to correctly run the task. Please note that the docker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about listing some arguments instead of using any
? As a new users I would not be sure what the arguments can be
airflow/decorators/__init__.py
Outdated
cap_add=cap_add, | ||
extra_hosts=extra_hosts, | ||
**kwargs, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I correctly understand most of those arguments are for DockerOperator. I'm wondering if it would make sense to generate the method automatically using DockerOperator class? This would have to advantages:
- we would be sure that any change in signature in DockerOperator is automatically reflected in
task.docker
- the mechanism can be reused for other operators/decorators
Of course "explicit is better than implicit" but "missing" arguments that users would like to use are a common case in operators world. WDYT @dimberman ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek what would it look like to generate the function automatically? Ideally a system where we can keep parity with the DockerOperator would be great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek I'm moving docker into a separate PR
if not callable(python_callable): | ||
raise TypeError('`python_callable` param must be callable') | ||
if 'self' in signature(python_callable).parameters.keys(): | ||
raise AirflowException('@task does not support methods') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about class methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek what do you mean? I imagine we want to keep parity with how the task decorator currently works
raise AirflowException( | ||
'Returned dictionary keys must be strings when using ' | ||
f'multiple_outputs, found {key} ({type(key)}) instead' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we cover this somewhere in docs or users have to learn the hard way? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turbaszek isn't this part of the taskflow API docs?
:type op_args: list | ||
:param op_kwargs: A dict of keyword arguments to pass to python_callable. | ||
:type op_kwargs: dict | ||
:param string_args: Strings that are present in the global var virtualenv_string_args, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose and usage of virtualenv_string_args
isn't immediately obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jhtimmins I'm not quite sure either, but I'm just copying the PythonVirtualEnvOperator API
airflow/decorators/__init__.py
Outdated
:type cap_add: list[str] | ||
""" | ||
return _docker_task( | ||
python_callable=python_callable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the python callable, should we enforce keyword-only arguments?
if not self.multiple_outputs: | ||
return return_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not self.multiple_outputs: | |
return return_value | |
if self.multiple_outputs: |
Since the return value isn't used, it isn't necessary to return.
airflow/operators/python.py
Outdated
@@ -107,7 +105,7 @@ class PythonOperator(BaseOperator): | |||
|
|||
template_fields = ('templates_dict', 'op_args', 'op_kwargs') | |||
template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} | |||
ui_color = PYTHON_OPERATOR_UI_COLOR | |||
ui_color = '#ffefeb' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The color is currently a magic value. I'd assign it to a constant to indicate what color it is.
BLUE = '#ffefeb'
ui_color = BLUE
num_paren = num_paren + 1 | ||
elif current == ")": | ||
num_paren = num_paren - 1 | ||
return ''.join(after_decorator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not matter, but this will fail if any of the input args have a parentheses value within a string, such as :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative approach:
def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:
"""
Removed @task.virtualenv
:param python_source:
"""
func_start = source.find("def ")
decorators = source[:func_start]
decorated = "@".join(d for d in decorators.split("@") if not d.startswith(task_decorator_name))
return decorated + source[func_start:]
Honestly this doesn't matter but I wanted to see if there was a clear alternative way to do it and here it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jhtimmins unfortunately this doesn't seem to work if you look at the tests (e.g. if you're nesting decorators)
Using the Taskflow API with Docker or Virtual Environments | ||
---------------------------------------------------------- | ||
|
||
As of Airflow <Airflow version>, you will have the ability to use the Taskflow API with either a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Update the version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jhtimmins this unfortunately can't be done until we know which version we add it to.
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
1 similar comment
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
8e805a8
to
40deb6c
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
88d85fd
to
e07483b
Compare
To improve the usability of the TaskFlow API, we will add the ability to define virtualenv environments so users can run tasks with environments that do not match that of the Airflow system
e07483b
to
c05f0e5
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
To improve the usability of the TaskFlow API, we will add the ability to
define virtualenv or docker environments so users can run tasks with
environments that do not match that of the Airflow system.
Example:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.