Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mapped TriggerDagRunOperator causes SerializationError due to operator_extra_links 'property' object is not iterable #24653

Closed
1 of 2 tasks
consciencee opened this issue Jun 25, 2022 · 1 comment · Fixed by #24676
Labels
area:core kind:bug This is a clearly a bug

Comments

@consciencee
Copy link

Apache Airflow version

2.3.2 (latest released)

What happened

Hi, I have a kind of issue with launching several subDags via mapping TriggerDagRunOperator (mapping over conf parameter). Here is the demo example of my typical DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime

with DAG(
    'triggerer',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:

    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
    )

    t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=XComArg(t1))

    t1 >> t2

But when Airflow tries to import such DAG it throws the following SerializationError (which I observed both in UI and in $AIRFLOW_HOME/logs/scheduler/latest/<my_dag_name>.py.log):

Broken DAG: [/home/aliona/airflow/dags/triggerer_dag.py] Traceback (most recent call last):
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 638, in _serialize_node
    serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 933, in _serialize_operator_extra_links
    for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1106, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/home/aliona/airflow/venv/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1014, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'triggerer': 'property' object is not iterable

How it appears in the UI:
image

What you think should happen instead

I think that TriggerDagRunOperator mapped over conf parameter should serialize and work well by default.

During the debugging process and trying to make everything work I found out that simple non-mapped TriggerDagRunOperator has value ['Triggered DAG'] in operator_extra_links field, so, it is Lisr. But as for mapped TriggerDagRunOperator, it is 'property'. I don't have any idea why during the serialization process Airflow cannot get value of this property, but I tried to reinitialize this field with ['Triggered DAG'] value and finally I fixed this issue in a such way.

For now, for every case of using mapped TriggerDagRunOperator I also use such code at the end of my dag file:

# here 'second' is the name of corresponding mapped TriggerDagRunOperator task (see demo code above)
t2_patch = dag.task_dict['second']
t2_patch.operator_extra_links=['Triggered DAG']
dag.task_dict.update({'second': t2_patch})

So, for every mapped TriggerDagRunOperator task I manually change value of operator_extra_links property to ['Triggered DAG'] and as a result there is no any SerializationError. I have a lot of such cases, and all of them are working good with this fix, all subDags are launched, mapped configuration is passed correctly. Also I can wait for end of their execution or not, all this options also work correctly.

How to reproduce

  1. Create dag with mapped TriggerDagRunOperator tasks (main parameters such as task_id, trigger_dag_id and others are in partial section, in expand section use conf parameter with non-empty iterable value), as, for example:
t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=[{'x': 1}])
  1. Try to serialize dag, and error will appear.

The full example of failing dag file:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow import XComArg
from datetime import datetime

with DAG(
    'triggerer',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:

    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : list(map(lambda i: {"x": i}, list(range(10)))),
    )

    t2 = TriggerDagRunOperator.partial(
        task_id='second',
        trigger_dag_id='mydag'
    ).expand(conf=[{'a': 1}])

    t1 >> t2

# uncomment these lines to fix an error
# t2_patch = dag.task_dict['second']
# t2_patch.operator_extra_links=['Triggered DAG']
# dag.task_dict.update({'second': t2_patch})

As subDag ('mydag') I use these DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

with DAG(
    'mydag',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 12, 2)
) as dag:
    t1 = PythonOperator(
        task_id='first',
        python_callable=lambda : print("first"),
    )
    t2 = PythonOperator(
        task_id='second',
        python_callable=lambda : print("second"),
    )
    t1 >> t2

Operating System

Ubuntu 22.04 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-ftp==2.1.2
apache-airflow-providers-http==2.1.2
apache-airflow-providers-imap==2.2.3
apache-airflow-providers-sqlite==2.1.3

Deployment

Virtualenv installation

Deployment details

Python 3.10.4
pip 22.0.2

Anything else

Currently for demonstration purposes I am using fully local Airflow installation: single node, SequentialExecutor and SQLite database backend. But such issue also appeared for multi-node installation with either CeleryExecutor or LocalExecutor and PostgreSQL database in the backend.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@consciencee consciencee added area:core kind:bug This is a clearly a bug labels Jun 25, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 25, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant