Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Serialized object property #25248

Closed
wants to merge 1 commit into from

Conversation

Taragolis
Copy link
Contributor

At that moment Airflow can't serialize DAG with dynamic tasks if operator_extra_links set as property
related: #25243, #25215, #24676

I've tried to add get actual values of property.


It works in simple cases: #25215, #24676

from pendulum import datetime

from airflow.decorators import dag
from airflow.sensors.external_task import ExternalTaskSensor


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def external_task_sensor():
    ExternalTaskSensor.partial(
        task_id='wait',
    ).expand(external_dag_id=["dag_1", "dag_2", "dag_3"])

_ = external_task_sensor()

image

However there is no reason use property operator_extra_links in this cases


It still not completely help in case when property uses for dynamic links selections such as: #25243

from pendulum import datetime

from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.batch import BatchOperator


@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def batchop_dtm():
    BatchOperator.partial(
        task_id='submit_batch_job',
        job_queue="batch_job_queue_name",
        job_definition="batch_job_definition_name",
        overrides={},
        # Set this flag to False, so we can test the sensor below
        wait_for_completion=False,
    ).expand(job_name=["job_1", "job_2", "job_3"])


_ = batchop_dtm()

With this PoC error changed, airflow.models.mappedoperator.MappedOperator doesn't have access to attributes of Task

Broken DAG: [/files/dags/batch_mapping.py] Traceback (most recent call last):
  File "/opt/airflow/airflow/utils/helpers.py", line 390, in resolve_property_value
    return prop.fget(obj)
  File "/opt/airflow/airflow/providers/amazon/aws/operators/batch.py", line 117, in operator_extra_links
    if self.wait_for_completion:
AttributeError: type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1178, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1086, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'batchop_dtm': type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'

cc: @josh-fell

@Taragolis Taragolis force-pushed the serialized-object-property branch from 2985072 to dafd7db Compare July 22, 2022 23:47
@Taragolis
Copy link
Contributor Author

BTW, seems like all XCom based External Links won't work with dynamic tasks.

@Taragolis
Copy link
Contributor Author

List of External Links which define operator_extra_links as property

@potiuk
Copy link
Member

potiuk commented Jul 23, 2022

Nice. I saw those issues too. I know it's draft but a look at that might be useful @uranusjr :)

@Taragolis
Copy link
Contributor Author

I found that Extra Links do not work with dynamic tasks at all.
Extra links assign to parent task instance (i do not know how to correct name this TI) but not to actual mapped TIs.
As result we only have number extra links defined in operator not (number extra links defined in operator) x number of mapped TIs

Sample DAG

from pendulum import datetime

from airflow.decorators import dag
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator

EXTERNAL_DAG_IDS = [f"example_external_dag_{ix:02d}" for ix in range(3)]
DAG_KWARGS = {
    "start_date": datetime(2022, 7, 1),
    "schedule_interval": "@daily",
    "catchup": False,
    "tags": ["mapped_extra_links", "AIP-42", "serialization"],
}


def external_dags():
    EmptyOperator(task_id="dummy")


@dag(**DAG_KWARGS)
def external_regular_task_sensor():
    for external_dag_id in EXTERNAL_DAG_IDS:
        ExternalTaskSensor(
            task_id=f'wait_for_{external_dag_id}',
            external_dag_id=external_dag_id,
            poke_interval=5,
        )


@dag(**DAG_KWARGS)
def external_mapped_task_sensor():
    ExternalTaskSensor.partial(
        task_id='wait',
        poke_interval=5,
    ).expand(external_dag_id=EXTERNAL_DAG_IDS)


dag_external_regular_task_sensor = external_regular_task_sensor()
dag_external_mapped_task_sensor = external_mapped_task_sensor()

for dag_id in EXTERNAL_DAG_IDS:
    globals()[dag_id] = dag(dag_id=dag_id, **DAG_KWARGS)(external_dags)()
Video Demo
mapped_extra_links.mp4

Might be better do not serialize Extra Links for dynamic tasks at all?
WDYT @potiuk @uranusjr

Comment on lines +381 to +391
def resolve_property_value(obj, prop):
"""Return class property value.

:param obj: Reference to class.
:param prop: Reference to class property.
:returns: If ``prop`` property than return property value,
otherwise it returns class attribute value.
"""
if isinstance(prop, property):
return prop.fget(obj)
return prop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can live directly in serialized_objects instead (and be a private function)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify - is it related to Operator?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Not sure I understand the question.

Copy link
Contributor Author

@Taragolis Taragolis Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for confuse you. Initially I think you mean add some implementation __serialized_fields in BatchOperator.
Now I understand what you mean place in serialized_objects module.

@Taragolis
Copy link
Contributor Author

@potiuk @uranusjr Do we need this PR or I could close it?

The only reason why I created this PR, try to get property values for extra_link property but this still can't be for BatchOperator. Seems it might solved by #25332

Also I could make changes in BatchOperator and define as tuple. In this case with some arguments in Operator some links always would greyed however it might be better rather than have serialisation error on mapped tasks.

@potiuk
Copy link
Member

potiuk commented Aug 3, 2022

Yeah. I think @uranusjr works on fixing this problem "properly" in #25500 . Closing this one.

@potiuk potiuk closed this Aug 3, 2022
@Taragolis Taragolis deleted the serialized-object-property branch August 3, 2022 11:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants