Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIP-31] Create @task decorator for functionally defined operators #8057

Closed
casassg opened this issue Apr 2, 2020 · 0 comments · Fixed by #8962
Closed

[AIP-31] Create @task decorator for functionally defined operators #8057

casassg opened this issue Apr 2, 2020 · 0 comments · Fixed by #8962
Assignees
Labels
AIP-31 Task Flow API for nicer DAG definition kind:feature Feature Requests

Comments

@casassg
Copy link
Contributor

casassg commented Apr 2, 2020

Description

Add simple way to wrap a function into a PythonFunctionalOperator.

  • Should be used without args or with args/kwargs for the underlying operator:
@task
def simple_task(...):

@task(dag=dag)
def simple_task(...):
  • Task ID should be the function name by default.
  • Should wrap the function so that autocomplete can still suggest parameters correctly.
  • Decorator should return an instance of PythonFunctionalOperator. This can be used to set task dependencies. Ex:
@task 
def simple_task(...)
  pass

simple_task >> another_task

Example implemetation: https://github.com/casassg/corrent/blob/master/corrent/decorators.py

Ways to use it:

  • @airflow.task: Lazy imported from main Airflow module (real location airflow.decorators.task). Makes function an operator, but does not automatically assign it to a DAG (unless declared inside a DAG context)
  • @dag.task: As a partial function from DAG class. Task automatically assigned to DAG.

Use case / motivation

  • Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator.

Stretch goals

  • Modify signature type hints (if any) to include XComArg for MyPy to not give issues. Example:
@task 
def simple_task(text:str):
   pass

inspect.signature(simple_task) -> (text:Union[str, XComArg])

Related Issues

Blocked by: #8056

@casassg casassg added the kind:feature Feature Requests label Apr 2, 2020
@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label Apr 2, 2020
kaxil pushed a commit that referenced this issue Jun 23, 2020
kaxil pushed a commit to kaxil/airflow that referenced this issue Jun 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition kind:feature Feature Requests
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants