-
Notifications
You must be signed in to change notification settings - Fork 14.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
545 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from airflow.operators.python import task # noqa # pylint: disable=unused-import |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,46 @@ DAGs can be used as context managers to automatically assign new operators to th | |
op.dag is dag # True | ||
.. _concepts:functional_dags: | ||
|
||
Functional DAGs | ||
--------------- | ||
|
||
DAGs can be defined using functional abstractions. Outputs and inputs are sent between tasks using | ||
:ref:`XCom values <concepts:xcom>`. In addition, you can wrap functions as tasks using the | ||
:ref:`task decorator <concepts:task_decorator>`. Airflow will also automatically add dependencies between | ||
tasks to ensure that XCom messages are available when operators are executed. | ||
|
||
Example DAG with functional abstraction | ||
|
||
.. code-block:: python | ||
with DAG( | ||
'send_server_ip', default_args=default_args, schedule_interval=None | ||
) as dag: | ||
# Using default connection as it's set to httpbin.org by default | ||
get_ip = SimpleHttpOperator( | ||
task_id='get_ip', endpoint='get', method='GET', xcom_push=True | ||
) | ||
@dag.task(multiple_outputs=True) | ||
def prepare_email(raw_json: str) -> Dict[str, str]: | ||
external_ip = json.loads(raw_json)['origin'] | ||
return { | ||
'subject':f'Server connected from {external_ip}', | ||
'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>' | ||
} | ||
email_info = prepare_email(get_ip.output) | ||
send_email = EmailOperator( | ||
task_id='send_email', | ||
to='[email protected]', | ||
subject=email_info['subject'], | ||
html_content=email_info['body'] | ||
) | ||
.. _concepts:dagruns: | ||
|
||
DAG Runs | ||
|
@@ -173,6 +213,62 @@ Each task is a node in our DAG, and there is a dependency from task_1 to task_2: | |
We can say that task_1 is *upstream* of task_2, and conversely task_2 is *downstream* of task_1. | ||
When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start. | ||
|
||
.. _concepts:task_decorator: | ||
|
||
Python task decorator | ||
--------------------- | ||
|
||
Airflow ``task`` decorator converts any Python function to an Airflow operator. | ||
The decorated function can be called once to set the arguments and key arguments for operator execution. | ||
|
||
|
||
.. code-block:: python | ||
with DAG('my_dag', start_date=datetime(2020, 5, 15)) as dag: | ||
@dag.task | ||
def hello_world(): | ||
print('hello world!') | ||
# Also... | ||
from airflow.decorators import task | ||
@task | ||
def hello_name(name: str): | ||
print(f'hello {name}!') | ||
hello_name('Airflow users') | ||
Task decorator captures returned values and sends them to the :ref:`XCom backend <concepts:xcom>`. By default, returned | ||
value is saved as a single XCom value. You can set ``multiple_outputs`` key argument to ``True`` to unroll dictionaries, | ||
lists or tuples into seprate XCom values. This can be used with regular operators to create | ||
:ref:`functional DAGs <concepts:functional_dags>`. | ||
|
||
Calling a decorated function returns an ``XComArg`` instance. You can use it to set templated fields on downstream | ||
operators. | ||
|
||
You can call a decorated function more than once in a DAG. The decorated function will automatically generate | ||
a unique ``task_id`` for each generated operator. | ||
|
||
.. code-block:: python | ||
with DAG('my_dag', start_date=datetime(2020, 5, 15)) as dag: | ||
@dag.task | ||
def update_user(user_id: int): | ||
... | ||
# Avoid generating this list dynamically to keep DAG topology stable between DAG runs | ||
for user_id in user_ids: | ||
update_user(user_id) | ||
# This will generate an operator for each user_id | ||
Task ids are generated by appending a number at the end of the original task id. For the above example, the DAG will have | ||
the following task ids: ``[update_user, update_user__1, update_user__2, ... update_user__n]``. | ||
|
||
Task Instances | ||
============== | ||
|
||
|
Oops, something went wrong.