Skip to content

Commit

Permalink
Add another way to dynamically generate DAGs to docs (#21297)
Browse files Browse the repository at this point in the history
Also, move dynamic DAG generation cases from "best practices" to "how to" section
  • Loading branch information
makrushin-evgenii authored Feb 25, 2022
1 parent 608b8c4 commit 5a6316f
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 87 deletions.
92 changes: 5 additions & 87 deletions docs/apache-airflow/best-practices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ Good example:
Dynamic DAG Generation
----------------------
Sometimes writing DAGs manually isn't practical.
Maybe you have a lot of DAGs that do similar things with just a parameter changing between them.
Or maybe you need a set of DAGs to load tables, but don't want to manually update DAGs every time those tables change.
In these and other cases, it can be more useful to dynamically generate DAGs.

Avoiding excessive processing at the top level code described in the previous chapter is especially important
in case of dynamic DAG configuration, which can be configured essentially in one of those ways:
Expand All @@ -190,93 +194,7 @@ in case of dynamic DAG configuration, which can be configured essentially in one
* via externally provided, generated Python code, containing meta-data in the DAG folder
* via externally provided, generated configuration meta-data file in the DAG folder

All cases are described in the following sections.

Dynamic DAGs with environment variables
.......................................

If you want to use variables to configure your code, you should always use
`environment variables <https://wiki.archlinux.org/title/environment_variables>`_ in your
top-level code rather than :doc:`Airflow Variables </concepts/variables>`. Using Airflow Variables
at top-level code creates a connection to metadata DB of Airflow to fetch the value, which can slow
down parsing and place extra load on the DB. See :ref:`best_practices/airflow_variables`
on how to make best use of Airflow Variables in your DAGs using Jinja templates .

For example you could set ``DEPLOYMENT`` variable differently for your production and development
environments. The variable ``DEPLOYMENT`` could be set to ``PROD`` in your production environment and to
``DEV`` in your development environment. Then you could build your dag differently in production and
development environment, depending on the value of the environment variable.

.. code-block:: python
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
Generating Python code with embedded meta-data
..............................................

You can externally generate Python code containing the meta-data as importable constants.
Such constant can then be imported directly by your DAG and used to construct the object and build
the dependencies. This makes it easy to import such code from multiple DAGs without the need to find,
load and parse the meta-data stored in the constant - this is done automatically by Python interpreter
when it processes the "import" statement. This sounds strange at first, but it is surprisingly easy
to generate such code and make sure this is a valid Python code that you can import from your DAGs.

For example assume you dynamically generate (in your DAG folder), the ``my_company_utils/common.py`` file:

.. code-block:: python
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like that:

.. code-block:: python
import pendulum
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
for task in ALL_TASKS:
# create your operators and relations here
pass
Don't forget that in this case you need to add empty ``__init__.py`` file in the ``my_company_utils`` folder
and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file, so that the whole folder is
ignored by the scheduler when it looks for DAGs.


Dynamic DAGs with external configuration from a structured data file
....................................................................

If you need to use a more complex meta-data to prepare your DAG structure and you would prefer to keep the
data in a structured non-python format, you should export the data to the DAG folder in a file and push
it to the DAG folder, rather than try to pull the data by the DAG's top-level code - for the reasons
explained in the parent :ref:`best_practices/top_level_code`.

The meta-data should be exported and stored together with the DAGs in a convenient file format (JSON, YAML
formats are good candidates) in DAG folder. Ideally, the meta-data should be published in the same
package/folder as the module of the DAG file you load it from, because then you can find location of
the meta-data file in your DAG easily. The location of the file to read can be found using the
``__file__`` attribute of the module containing the DAG:

.. code-block:: python
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
Some cases of dynamic DAG generation are described in the :doc:`howto/dynamic-dag-generation` section.

.. _best_practices/airflow_variables:

Expand Down
140 changes: 140 additions & 0 deletions docs/apache-airflow/howto/dynamic-dag-generation.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Dynamic DAG Generation
======================

Dynamic DAGs with environment variables
.......................................

If you want to use variables to configure your code, you should always use
`environment variables <https://wiki.archlinux.org/title/environment_variables>`_ in your
top-level code rather than :doc:`Airflow Variables </concepts/variables>`. Using Airflow Variables
at top-level code creates a connection to metadata DB of Airflow to fetch the value, which can slow
down parsing and place extra load on the DB. See the `Airflow Variables <_best_practices/airflow_variables>`_
on how to make best use of Airflow Variables in your DAGs using Jinja templates .

For example you could set ``DEPLOYMENT`` variable differently for your production and development
environments. The variable ``DEPLOYMENT`` could be set to ``PROD`` in your production environment and to
``DEV`` in your development environment. Then you could build your dag differently in production and
development environment, depending on the value of the environment variable.

.. code-block:: python
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
Generating Python code with embedded meta-data
..............................................

You can externally generate Python code containing the meta-data as importable constants.
Such constant can then be imported directly by your DAG and used to construct the object and build
the dependencies. This makes it easy to import such code from multiple DAGs without the need to find,
load and parse the meta-data stored in the constant - this is done automatically by Python interpreter
when it processes the "import" statement. This sounds strange at first, but it is surprisingly easy
to generate such code and make sure this is a valid Python code that you can import from your DAGs.

For example assume you dynamically generate (in your DAG folder), the ``my_company_utils/common.py`` file:

.. code-block:: python
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like that:

.. code-block:: python
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
for task in ALL_TASKS:
# create your operators and relations here
pass
Don't forget that in this case you need to add empty ``__init__.py`` file in the ``my_company_utils`` folder
and you should add the ``my_company_utils/.*`` line to ``.airflowignore`` file, so that the whole folder is
ignored by the scheduler when it looks for DAGs.


Dynamic DAGs with external configuration from a structured data file
....................................................................

If you need to use a more complex meta-data to prepare your DAG structure and you would prefer to keep the
data in a structured non-python format, you should export the data to the DAG folder in a file and push
it to the DAG folder, rather than try to pull the data by the DAG's top-level code - for the reasons
explained in the parent :ref:`best_practices/top_level_code`.

The meta-data should be exported and stored together with the DAGs in a convenient file format (JSON, YAML
formats are good candidates) in DAG folder. Ideally, the meta-data should be published in the same
package/folder as the module of the DAG file you load it from, because then you can find location of
the meta-data file in your DAG easily. The location of the file to read can be found using the
``__file__`` attribute of the module containing the DAG:

.. code-block:: python
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
Dynamic DAGs with ``globals()``
...............................
You can dynamically generate DAGs by working with ``globals()``.
As long as a ``DAG`` object in ``globals()`` is created, Airflow will load it.

.. code-block:: python
from datetime import datetime
from airflow.decorators import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
globals()[dag_id] = dynamic_generated_dag()
The code below will generate a DAG for each config: ``dynamic_generated_dag_config1`` and ``dynamic_generated_dag_config2``.
Each of them can run separately with related configuration

.. warning::
Using this practice, pay attention to "late binding" behaviour in Python loops. See `that GitHub discussion <https://github.com/apache/airflow/discussions/21278#discussioncomment-2103559>`_ for more details
1 change: 1 addition & 0 deletions docs/apache-airflow/howto/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ configuring an Airflow environment.
use-test-config
define_extra_link
email-config
dynamic-dag-generation

0 comments on commit 5a6316f

Please sign in to comment.