Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic tasks marked as upstream_failed when none of their upstream tasks are failed or upstream_failed #27449

Closed
1 of 2 tasks
matthewblock opened this issue Nov 1, 2022 · 19 comments · Fixed by #27506
Closed
1 of 2 tasks
Labels
affected_version:2.4 Issues Reported for 2.4 area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@matthewblock
Copy link

matthewblock commented Nov 1, 2022

Apache Airflow version

2.4.2

What happened

There is a mapped task is getting marked as upstream_failed when none of its upstream tasks are failed or upstream_failed.

image

In the above graph view, if first_task finishes before second_task, first_task immediately tries to expand middle_task. Note - this is an important step to reproduce - The order the tasks finish matter.

Note that the value of the Airflow configuration variable schedule_after_task_execution must be True (the default) for this to occur.

The expansion occurs when the Task supervisor performs the "mini scheduler", in this line in dagrun.py.

Which then marks middle_task as upstream_failed in this line in mappedoperator.py:

                # If the map length cannot be calculated (due to unavailable
                # upstream sources), fail the unmapped task.

I believe this was introduced by the PR Fail task if mapping upstream fails.

What you think should happen instead

The dynamic tasks should successfully execute. I don't think the mapped task should expand because its upstream task hasn't completed at the time it's expanded. If the upstream task were to complete earlier, it would expand successfully.

How to reproduce

Execute this DAG, making sure Airflow configuration schedule_after_task_execution is set to default value True.

from datetime import datetime, timedelta
import time

from airflow import DAG, XComArg
from airflow.operators.python import PythonOperator


class PrintIdOperator(PythonOperator):
    def __init__(self, id, **kwargs) -> None:
        super().__init__(**kwargs)
        self.op_kwargs["id"] = id


DAG_ID = "test_upstream_failed_on_mapped_operator_expansion"


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=1),
    "retries": 0,
}


def nop(id):
    print(f"{id=}")


def get_ids(delay: int = 0):
    print(f"Delaying {delay} seconds...")
    time.sleep(delay)
    print("Done!")
    return [0, 1, 2]


with DAG(
    dag_id=DAG_ID,
    default_args=default_args,
    start_date=datetime(2022, 8, 3),
    catchup=False,
    schedule=None,
    max_active_runs=1,
) as dag:

    second_task = PythonOperator(
        task_id="second_task",
        python_callable=get_ids,
        op_kwargs={"delay": 10}
    )

    first_task = PythonOperator(
        task_id="first_task",
        python_callable=get_ids,
    )

    middle_task = PrintIdOperator.partial(
        task_id="middle_task",
        python_callable=nop,
    ).expand(id=XComArg(second_task))

    last_task = PythonOperator(
        task_id="last_task",
        python_callable=nop,
        op_kwargs={"id": 1},
    )

    [first_task, middle_task] >> last_task

Operating System

debian buster

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@matthewblock matthewblock added area:core kind:bug This is a clearly a bug labels Nov 1, 2022
@ephraimbuddy
Copy link
Contributor

Can you share the dag code? You can redact some values. Without sharing your code it might be difficult to understand what's going on

@uranusjr
Copy link
Member

uranusjr commented Nov 2, 2022

Without looking at anything, one UPSTREAM_FAILED case specific to a mapped task is when the upstream pushes a non-mappable value (an int, for example). This feels likely for your case.

@matthewblock
Copy link
Author

matthewblock commented Nov 2, 2022

A few other details I discovered while debugging:

  • The "first" (most upstream) upstream_failed tasks have no start_date or end_date but the tasks immediately downstream of them that are also upstream_failed do have start_date and end_date
  • I noticed in another task that ran successfully, that is in parallel to the upstream_failed tasks, log this:
[2022-11-01, 09:15:25 PDT] {mappedoperator.py:622} INFO - Cannot expand <Mapped(MyCustomOperator): my_custom_task_1> for run manual__2022-11-01T16:06:39.483587+00:00; missing upstream values: ['my_xcom_value']
[2022-11-01, 09:15:25 PDT] {mappedoperator.py:622} INFO - Cannot expand <Mapped(MyCustomOperator): my_custom_task_2> for run manual__2022-11-01T16:06:39.483587+00:00; missing upstream values: ['my_xcom_value']
[2022-11-01, 09:15:25 PDT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

Why would the scheduler be trying to expand tasks during another task's execution? Is it because the task downstream has those mapped tasks as upstream? Even so, it shouldn't mark them as upstream_failed...

@matthewblock
Copy link
Author

Can you share the dag code? You can redact some values. Without sharing your code it might be difficult to understand what's going on

Unfortunately it's proprietary code from my employer, hence the de-ID. I am attempting to create a new test DAG that I can share to reproduce the error.

@matthewblock
Copy link
Author

Can you share the dag code? You can redact some values. Without sharing your code it might be difficult to understand what's going on

Unfortunately it's proprietary code from my employer, hence the de-ID. I am attempting to create a new test DAG that I can share to reproduce the error.

I was able to successfully create a test case. I also updated the issue language reflecting more research.

At this point I can't imagine why this behavior is desirable but it's likely I'm missing something.

@ephraimbuddy
Copy link
Contributor

I was not able to reproduce with your test case. You also have an issue on the test case:

last_task = PythonOperator(
        task_id="last_task",
        python_callable=nop,
    )

should have op_args argument since nop takes an argument. so:
It should be this:

last_task = PythonOperator(
        task_id="last_task",
        python_callable=nop,
        op_args = [1]
    ) 

for example

@eladkal eladkal added Can't Reproduce The problem cannot be reproduced area:dynamic-task-mapping AIP-42 affected_version:2.4 Issues Reported for 2.4 and removed area:core labels Nov 3, 2022
@matthewblock
Copy link
Author

@eladkal - I corrected my test case with op_kwargs.

Do you have AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION set to True or False? The default value True causes this error, but setting it to False prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Nov 3, 2022

A

@eladkal - I corrected my test case with op_kwargs.

Do you have AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION set to True or False? The default value True causes this error, but setting it to False prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.

I used the default for AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION which is True.
What database are you using? Did you set any other environment variable other than the above? I can't reproduce it running your updated dag

However, I see that in my case, the second task starts before the first task and they finish together

@romsla
Copy link

romsla commented Nov 3, 2022

Having the same issue with airflow 2.4.1.
I got this bug on kubernetes installation with 8 schedulers on pgsql, and will try to reproduce the case myself later today.
The first task has to finish faster than the second one, this seems to be the prerequisite.
In my case the first task is using a cached function call, so first runs are ok. And sometimes the runs along the line are ok too.

@ephraimbuddy
Copy link
Contributor

A

@eladkal - I corrected my test case with op_kwargs.
Do you have AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION set to True or False? The default value True causes this error, but setting it to False prevents the mini scheduler from forcing an expansion too early, and the DAG succeeds.

I used the default for AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION which is True. What database are you using? Did you set any other environment variable other than the above? I can't reproduce it running your updated dag

However, I see that in my case, the second task starts before the first task and they finish together

Reproduced with LocalExecutor!

@ephraimbuddy ephraimbuddy removed the Can't Reproduce The problem cannot be reproduced label Nov 3, 2022
@ephraimbuddy
Copy link
Contributor

It looks like non-expanded mapped task instances don't have upstream_list i.e ti.task.upstream_list is []. I got the test case to work with this:

diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 39722c6df3..f50043c7b6 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -67,6 +67,9 @@ class TriggerRuleDep(BaseTIDep):
     def _get_dep_statuses(self, ti, session, dep_context: DepContext):
         # Checking that all upstream dependencies have succeeded
         if not ti.task.upstream_list:
+            if ti.task.is_mapped and ti.map_index == -1:
+                yield self._failing_status(reason="Task is a mapped task and has not expanded.")
+                return
             yield self._passing_status(reason="The task instance did not have any upstream tasks.")
             return

cc: @uranusjr, would like to hear what you think about this

@uranusjr
Copy link
Member

uranusjr commented Nov 3, 2022

Something doesn’t click. Why does the non-expanded mapped task not have a proper upstream_list? If it’s not yet expanded, the task should be in the DAG, and has proper dependency information set.

@ephraimbuddy
Copy link
Contributor

Something doesn’t click. Why does the non-expanded mapped task not have a proper upstream_list? If it’s not yet expanded, the task should be in the DAG, and has proper dependency information set.

It was removed here

airflow/airflow/models/dag.py

Lines 2240 to 2244 in 3aadc44

for t in dag.tasks:
# Removing upstream/downstream references to tasks that did not
# make the cut
t.upstream_task_ids.intersection_update(dag.task_dict)
t.downstream_task_ids.intersection_update(dag.task_dict)

when partial_subset was called

partial_dag = task.dag.partial_subset(
task.downstream_task_ids,
include_downstream=True,
include_upstream=False,
include_direct_upstream=True,
)

The removal seems like a bug? maybe @ashb can chip in

@ashb
Copy link
Member

ashb commented Nov 4, 2022

Okay, so the problem here is, as Ephraim identified, that the mini scheduler operates on a subset of the DAG, not the complete dag.

So when first_task finishes we get a partial DAG containing the following tasks: first_task, last_task, middle_task. Note crucially that we dont have second_task.

The reason we use a partial subset rather than the whole dag was a performance optimization. Essentially: this task just finished, lets look at the downstream task and see if any of them can be scheduled. But in order to check if those can be scheduled, we need the upstream of those tasks (which is how we get to include middle_task).

So I think we have two options here:

  1. Remove include_direct_upstream so that the partial dag includes all upstreams. Con to this: mini scheduler does more work (but likely not all that much more)

  2. Change the expansion so that we don't fail mapped tasks if we can't expand when self.dag.partial is True?

@ashb
Copy link
Member

ashb commented Nov 4, 2022

Oh and the removal is not a bug (at least not in the parital_subset function itself): because include_direct_upstream=True that function only goes one level up from last_task.

@ephraimbuddy
Copy link
Contributor

Ok. Thanks Ash, I think option 2 is better:

Change the expansion so that we don't fail mapped tasks if we can't expand when self.dag.partial is True?

Since it will limit the change to mapped task. Let me try it

@potiuk
Copy link
Member

potiuk commented Nov 4, 2022

Wild idea.

This was a loose thought I have for quite some time while reviewing some of issues connected to mini-scheduler. I think the mini-scheduler "approach" was a good idea - triggering direct downstream task when task is finished as soon as possible is generally cool. However it introduces complexity (similar to above) because it is IMHO a tea that is "almost, but not quite, entirely unlike tea" to quote Douglas Adams. I looks like scheduler scheduling tasks, it behaves like that, but there are some small quirks (like above) that make it susceptible to subtle bugs.

Also I think the original idea that the mini-scheduler will be run in a more "distributed" fashion is a bit hampered by the fact that bulk of what the mini-scheduler does is done via the DB anyway and it synchronizes on DagRun lock and DB operation so pretty much all the "distribution" benefits are all but gone. Havint multiple schedulers already provides a way to "distribute" scheduling and distributing it even more does not change much. Yes, it uses the fact that the DAG is already loaded in memory and some of the DB objects are effectively cached by SQLAlchemy but I think with 2.4 and the "micro-pipelines" approach where our users have much better ways to split their DAGs into smaller, independent DAGs, this is becomes far less of an issue.

I thought that we could implement "mini-scheduling" slightly differently. Rather than actually do the scheduling, we could add DagRun of just completed task to a some kind of table where we could keep "priority" DagRuns to schedule by a "real" scheduler. Then it would be pretty immediately picked up by (one of) the schedulers at the next scheduling loop and scheduled. The latency there would be slightly bigger, but not that much IMHO and we would loose the "DAG structure in-memory", but I think it would make it a bit more robust - especially when we start introducing new concepts.

It's not entirely hashed out - there are probably some edge cases where for example we have a lot of those "priority" dagruns to process (what should we do with the non-priority ones for example).

I am mentioning it here because I think such an approach would pretty much automatically solve the above problem.

WDYT?

@ashb
Copy link
Member

ashb commented Nov 4, 2022

Such a change shouldn't be made without measuring the performance before and after. Yes removing it would make things simpler, but I think a tiny bit of complexity is worth it for the speed boost. (I can't remember the figures off the top of my head, but I know we measured it before adding this, and it did make a difference to the "inter-task lag")

The place where this change really helps is when you have a large number (500-1000+) of DAGs, as this mini scheduler only locks the current dag, leaving the scheduler on to do other DAGs.

@potiuk
Copy link
Member

potiuk commented Nov 4, 2022

Such a change shouldn't be made without measuring the performance before and after. Yes removing it would make things simpler, but I think a tiny bit of complexity is worth it for the speed boost. (I can't remember the figures off the top of my head, but I know we measured it before adding this, and it did make a difference to the "inter-task lag")

Absolutely - I think there might be some effects I have not thought about. Just wanted to bounce off of you the idea and thought I had. I do realise there are performance implications if we change it, just wanted to raise it as an option so that others might also think about it and see as possibility :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.4 Issues Reported for 2.4 area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants