Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply task instance mutation hook consistently #38440

Conversation

jscheffl
Copy link
Contributor

We are using the Cluster Policies and in the the feature of the "Task Instance Mutation" to route workload to the respective endpoint. Respective endpoint means that we use multiple Celery queues and distribute the work. As the distribution is based on workflow meta data and we don't want to add the routing complexity into the workflow (modelling the workflow statically for all routing combinations) the task instance mutation is the only option.

As discussed in #32471 we have seen that the task instance mutation works in general "well" for the first execution but we saw a couple of errors:

  • When using task_instance_mutation_hook the UI in DAGs->Grid->Task Details->More Details always shows the task definition value of queue and not the mutated value, which actually is stored in DB. More worse, when navigating in the UI the existing queue value in DB is reset to standard queue value w/o hook applied
  • When task fails, the retry does not apply the mutation hook and the task will go to standard queue again
  • When using dynamic task mapping, only first mapped task receives the queue from the mutation hook (later created during mapping not)

Root cause is that after initial task creation defaults are loaded from python code many times on multiple levels. Root casue seems to be TaskInstance._refresh_from_task().

Fixing these to lines as in this PR removes all problems as described above. Trade-off will be that the policy code is executed a lot more often. But assuming this is not implemented with performance overhead it should not generate a performance impact.

How to test:

  • Apply a cluster policy that changes the queue on some (or all :-D) tasks
  • Use for example the example_params_trigger_ui and introduce some random errors in the code. Example attached below.
  • Run this, ensure you have celcery workers serving the default and the "other_queue". I was setting an env QUEUE for the queue worker to print this in the DAG when testing
  • Check logs of failed tasks, mapped tasks that are not the first ones and UI display for "queue" field

closes: #32471

FYI @AutomationDev85 @wolfdn @clellmann

Example cluster policy used for testing as airflow_local_settings.py:

from airflow.models.taskinstance import TaskInstance
def task_instance_mutation_hook(task_instance: TaskInstance):
    print("################# POLICY IS APPLIED! ##################################")
    task_instance.queue = "other_queue"

Modified DAG for testing - example_params_trigger_ui.py:

from __future__ import annotations

import datetime
from random import randint
from pathlib import Path
from os import getenv
from typing import TYPE_CHECKING

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule

if TYPE_CHECKING:
    from airflow.models.dagrun import DagRun
    from airflow.models.taskinstance import TaskInstance

def print_where_executed():
    print("####################################################")
    print(f"This taks is executed on queue {getenv('QUEUE', 'UNDEFINED!')}")
    print("####################################################")

with DAG(
    dag_id=Path(__file__).stem,
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example_ui"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", retries=4, retry_delay=5.0)
    def get_names(**kwargs) -> list[str]:
        ti: TaskInstance = kwargs["ti"]
        dag_run: DagRun = ti.dag_run
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        if "names" not in dag_run.conf:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return dag_run.conf["names"]

    @task.branch(task_id="select_languages", retries=4, retry_delay=5.0)
    def select_languages(**kwargs) -> list[str]:
        ti: TaskInstance = kwargs["ti"]
        dag_run: DagRun = ti.dag_run
        selected_languages = []
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        for lang in ["english", "german", "french"]:
            if lang in dag_run.conf and dag_run.conf[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", retries=4, retry_delay=5.0)
    def generate_english_greeting(name: str) -> str:
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", retries=4, retry_delay=5.0)
    def generate_german_greeting(name: str) -> str:
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", retries=4, retry_delay=5.0)
    def generate_french_greeting(name: str) -> str:
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", trigger_rule=TriggerRule.ALL_DONE, retries=4, retry_delay=5.0)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        print_where_executed()
        if randint(0, 1) > 0:
            raise Exception("Something went wrong!")
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)

@jscheffl jscheffl requested a review from potiuk March 24, 2024 17:33
@jscheffl jscheffl added area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes area:core labels Mar 24, 2024
@potiuk
Copy link
Member

potiuk commented Mar 24, 2024

NIT: Maybe a test case @jscheffl ?

@jscheffl jscheffl force-pushed the bugfix/32471-ensure-cluster-polisies-are-applied-consistently branch from 9fd5280 to 91d8a14 Compare March 25, 2024 22:53
@potiuk
Copy link
Member

potiuk commented Mar 25, 2024

nice!

@potiuk potiuk merged commit 615e1ec into apache:main Mar 26, 2024
46 checks passed
utkarsharma2 pushed a commit to astronomer/airflow that referenced this pull request Apr 22, 2024
* Apply task instance mutation hook consistently

* Add test for cluster policy applied in pytest
@ephraimbuddy ephraimbuddy added this to the Airflow 2.9.2 milestone Jun 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants