Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MappedOperator doesn't allow operator_extra_links instance property #31902

Closed
1 of 2 tasks
harrisonharris-di opened this issue Jun 14, 2023 · 2 comments · Fixed by #31904
Closed
1 of 2 tasks

MappedOperator doesn't allow operator_extra_links instance property #31902

harrisonharris-di opened this issue Jun 14, 2023 · 2 comments · Fixed by #31904
Assignees
Labels
affected_version:2.6 Issues Reported for 2.6 area:serialization kind:bug This is a clearly a bug

Comments

@harrisonharris-di
Copy link

Apache Airflow version

2.6.1

What happened

When doing .partial() and .expand() on any operator which has an instance property (e.g. @property) of operator_extra_links the MappedOperator does not handle it properly, causing the dag to fail to import.

The BatchOperator from airflow.providers.amazon.aws.operators.batch is one example of an operator which defines operator_extra_links on a per-instance basis.

What you think should happen instead

The dag should not fail to import (especially when using the AWS BatchOperator!) Either:

  • If per-instance operator_extra_links is deemed disallowed behaviour
    • MappedOperator should detect it's a property and give a more helpful error message
    • BatchOperator from the AWS provider should be changed. If I need to open another ticket elsewhere for that please let me know
  • If per-instance operator_extra_links is allowed
    • MappedOperator needs to be adjusted to account for that

How to reproduce

import pendulum
from airflow.models.baseoperator import BaseOperator
from airflow.decorators import dag, task
class BadOperator(BaseOperator):
    def __init__(self, *args, some_argument: str, **kwargs):
        super().__init__(*args, some_argument, **kwargs)

    @property
    def operator_extra_links(self):
        # !PROBLEMATIC FUNCTION!
        # ... Some code to create a collection of `BaseOperatorLink`s dynamically
        return tuple()

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"]
)
def airflow_is_bad():
    """
    Example to demonstrate issue with airflow API
    """

    @task
    def create_arguments():
        return [1,2,3,4]

    bad_operator_test_group = BadOperator.partial(
        task_id="bad_operator_test_group",
    ).expand(some_argument=create_arguments())

dag = airflow_is_bad()

Put this in your dags folder, Airflow will fail to import the dag with error

Broken DAG: [<USER>/airflow/dags/airflow_is_bad_minimal_example.py] Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 825, in _serialize_node
    serialize_op["_operator_extra_links"] = cls._serialize_operator_extra_links(
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py", line 1165, in _serialize_operator_extra_links
    for operator_extra_link in operator_extra_links:
TypeError: 'property' object is not iterable

Commenting out the operator_extra_links from the BadOperator in the example will allow the dag to be imported fine

Operating System

macOS Ventura 13.4

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.2

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

I found adjusting operator_extra_links on BatchOperator to be operator_extra_links = (BatchJobDetailsLink(), BatchJobDefinitionLink(), BatchJobQueueLink(), CloudWatchEventsLink()) solved my issue and made it run fine, however I've no idea if that's safe or generalises because I'm not sure what operator_extra_links is actually for internally.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@harrisonharris-di harrisonharris-di added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 14, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 14, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@asherkhb-ktx
Copy link

It seems like this isn't actually entirely resolved, specifically w/r/t the Amazon Batch operator mentioned in the original issue. In that case, DAG imports fail due to the fact that the property defines some if conditions based on self which cannot resolve on the MappedOperator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.6 Issues Reported for 2.6 area:serialization kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants