Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task is not retried when worker pod fails to start #16625

Closed
luqic opened this issue Jun 24, 2021 · 9 comments · Fixed by #17819
Closed

Task is not retried when worker pod fails to start #16625

luqic opened this issue Jun 24, 2021 · 9 comments · Fixed by #17819
Labels
kind:bug This is a clearly a bug
Milestone

Comments

@luqic
Copy link

luqic commented Jun 24, 2021

Apache Airflow version: 2.0.2

Kubernetes version:

Client Version: version.Info{Major:"1", Minor:"18", GitVersion:"v1.18.8", GitCommit:"9f2892aab98fe339f3bd70e3c470144299398ace", GitTreeState:"clean", BuildDate:"2020-08-13T16:12:48Z", GoVersion:"go1.13.15", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"17+", GitVersion:"v1.17.17-gke.4900", GitCommit:"2812f9fb0003709fc44fc34166701b377020f1c9", GitTreeState:"clean", BuildDate:"2021-03-19T09:19:27Z", GoVersion:"go1.13.15b4", Compiler:"gc", Platform:"linux/amd64"}
  • Cloud provider or hardware configuration: GKE

What happened:

After the worker pod for the task failed to start, the task is marked as failed with the error message Executor reports task instance <TaskInstance: datalake_db_cdc_data_integrity.check_integrity_core_prod_my_industries 2021-06-14 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?. The task should have been reattempted as it still has retries left.

{kubernetes_executor.py:147} INFO - Event: datalakedbcdcdataintegritycheckintegritycoreprodmyindustries.17f690ef0328488fadeba2dd00f8175d had an event of type MODIFIED
{kubernetes_executor.py:202} ERROR - Event: datalakedbcdcdataintegritycheckintegritycoreprodmyindustries.17f690ef0328488fadeba2dd00f8175d Failed
{kubernetes_executor.py:352} INFO - Attempting to finish pod; pod_id: datalakedbcdcdataintegritycheckintegritycoreprodmyindustries.17f690ef0328488fadeba2dd00f8175d; state: failed; annotations: {'dag_id': 'datalake_db_cdc_data_integrity', 'task_id': 'check_integrity_core_prod_my_industries', 'execution_date': '2021-06-14T00:00:00+00:00', 'try_number': '1'}
{kubernetes_executor.py:532} INFO - Changing state of (TaskInstanceKey(dag_id='datalake_db_cdc_data_integrity', task_id='check_integrity_core_prod_my_industries', execution_date=datetime.datetime(2021, 6, 14, 0, 0, tzinfo=tzlocal()), try_number=1), 'failed', 'datalakedbcdcdataintegritycheckintegritycoreprodmyindustries.17f690ef0328488fadeba2dd00f8175d', 'prod', '1510796520') to failed
{scheduler_job.py:1210} INFO - Executor reports execution of datalake_db_cdc_data_integrity.check_integrity_core_prod_my_industries execution_date=2021-06-14 00:00:00+00:00 exited with status failed for try_number 1
{scheduler_job.py:1239} ERROR - Executor reports task instance <TaskInstance: datalake_db_cdc_data_integrity.check_integrity_core_prod_my_industries 2021-06-14 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?

What you expected to happen:

The task status should have been set as up_for_retry instead of failing immediately.

Anything else we need to know:

This error has occurred 6 times over the past 2 months, and to seemingly random tasks in different DAGs. We run 60 DAGs with 50-100 tasks each every 30 minutes. The affected tasks are a mix of PythonOperator and SparkSubmitOperator. The first time we saw it was in mid Apr, and we were on Airflow version 2.0.1. We upgraded to Airflow version 2.0.2 in early May, and the error has occurred 3 more times since then.

Also, the issue where the worker pod cannot start is a common error that we frequently encounter, but in most cases these tasks are correctly marked as up_for_retry and reattempted.

This is currently not a big issue for us since it's so rare, but we have to manually clear the tasks that failed to get them to rerun because the tasks are not retrying. They have all succeeded on the first try after clearing.

Also, I'm not sure if this issue is related to #10790 or #16285, so I just created a new one. It's not quite the same as #10790 because the tasks affected are not ExternalTaskSensors, and also #16285 because the offending lines pointed out there are not in 2.0.2.

Thanks!

@luqic luqic added the kind:bug This is a clearly a bug label Jun 24, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 24, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@mmazek
Copy link

mmazek commented Jul 26, 2021

We're also impacted by this bug. We're running hundreds of tasks every hour and every day we're ending up with multiple task instances stuck in the queued state. We currently have to clear out the state of all those queued tasks, so that they are picked up again.

Apache Airflow version: 2.0.2

Kubernetes version:
Server Version: version.Info{Major:"1", Minor:"20+", GitVersion:"v1.20.4-eks-6b7464", GitCommit:"6b746440c04cb81db4426842b4ae65c3f7035e53", GitTreeState:"clean", BuildDate:"2021-03-19T19:33:03Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}

@stijndehaes
Copy link
Contributor

stijndehaes commented Aug 10, 2021

We noticed this issue with Airflow 2.1.2. Job went from queued to failed without retry, looking at the code I am not sure how to fix it. It is clear that in scheduler_job.py on line 1238 we see the relevant logs.
Maybe there should be logic here to check if the task needs to be retried and change the state to retried if needed? That logic is currently completely circumvented by just setting the state from the scheduler.
Looking at the code again, a TaskCallbackRequest event is sent to the processor_agent, this will eventually be processed by the function execute_callbacks, that will execute the task instance method handle_failure_with_callback, this one should set the state of the task instance to Up for retry, however this does not happen for some reason. In theory the dagbag could have not contained the dag or task when it was processing the message. But that seems very unlikely

The relevant logs (the dag and task name are erased because they might contain sensitive information):

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|       @timestamp        |                                                                                                                                                                                                                                log                                                                                                                                                                                                                                |
|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2021-08-06 17:27:30.210 | [2021-08-06 17:27:30,209] {scheduler_job.py:1254} ERROR - Executor reports task instance <TaskInstance: xxxx 1990-06-10 00:00:00+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?                                                                                                                                                                                      |
| 2021-08-06 17:27:30.205 | [2021-08-06 17:27:30,204] {scheduler_job.py:1218} INFO - Executor reports execution of xxxx execution_date=1990-06-10 00:00:00+00:00 exited with status failed for try_number 5                                                                                                                                                                                                                                                         |
| 2021-08-06 17:27:30.204 | [2021-08-06 17:27:30,204] {kubernetes_executor.py:546} INFO - Changing state of (TaskInstanceKey(dag_id='xxxx', task_id='xxxx', execution_date=datetime.datetime(1990, 6, 10, 0, 0, tzinfo=tzlocal()), try_number=5), 'failed', 'xxxx, 'dev', '97113730') to failed                                                                                                                  |
| 2021-08-06 17:27:30.203 | [2021-08-06 17:27:30,202] {kubernetes_executor.py:368} INFO - Attempting to finish pod; pod_id: xxxx; state: failed; annotations: {'dag_id': 'xxxx', 'task_id': 'xxxx', 'execution_date': '1990-06-10T00:00:00+00:00', 'try_number': '5'}                                                                                                                                             |
| 2021-08-06 17:22:23.371 | [2021-08-06 17:22:23,371] {scheduler_job.py:1245} INFO - Setting external_id for <TaskInstance: xxxx 1990-06-10 00:00:00+00:00 [queued]> to 1606                                                                                                                                                                                                                                                                                        |
| 2021-08-06 17:22:23.367 | [2021-08-06 17:22:23,367] {scheduler_job.py:1218} INFO - Executor reports execution of xxxx execution_date=1990-06-10 00:00:00+00:00 exited with status queued for try_number 5                                                                                                                                                                                                                                                         |
2021-08-06 17:27:30.205 | [2021-08-06 17:27:30,204] {scheduler_job.py:1218} INFO - Executor reports execution of xxxxr execution_date=1990-06-10 00:00:00+00:00 exited with status failed for try_number 5

Kubernetes version (EKS):
Server Version: version.Info{Major:"1", Minor:"21+", GitVersion:"v1.21.2-eks-0389ca3", GitCommit:"8a4e27b9d88142bbdd21b997b532eb6d493df6d2", GitTreeState:"clean", BuildDate:"2021-07-31T01:34:46Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"linux/amd64"}

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Aug 10, 2021

This PR #15929 fixed the issue of not having to clear the task before it can be rerun again and that is a major issue otherwise the task is stuck as explained by @mmazek above

@stijndehaes, can you check the log at logs/scheduler/{CURRENT_DATE}/{DAGFILENAME.py.log} when this happens?

@ashb @jhtimmins, I'm now thinking that this change #15929 should not be released yet. Though it clears tasks from being stuck in the queued/up_for_retry state, it sets them to failed state without checking if they have retries.

I'm wondering if there's a better way to do it?

@stijndehaes
Copy link
Contributor

@ephraimbuddy I have found a way to consistently trigger the issue using the dag attached below.
You first have to just let it run. It will nicely retry and be set to failed after trying twice.
If you clear the task after that it will only try once instead of two times. Not 100% sure if it's the same issue though, but this one is also unexpected and at least is reproducable.

import importlib
from airflow import DAG
from datetime import datetime, timedelta
from kubernetes.client import models as k8s
from airflow.operators.python import PythonOperator


utils = importlib.import_module("sample-python.utils")

default_args = {
    "start_date": datetime.now() - timedelta(days=2),
    "retries": 2,
    "retry_delay": timedelta(seconds=1),
}

sample_python_dag = DAG(
    "retry-failure",
    default_args=default_args,
    schedule_interval="@daily",
)

def my_func(ds, **kwargs):
    return ''

PythonOperator(
    dag=sample_python_dag,
    task_id="task",
    python_callable=my_func,
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        command=["FailEarly"],
                    ),
                ],
            )
        ),
    },
)

@stijndehaes
Copy link
Contributor

The log of the scheduler is the following pattern repeated:

[2021-08-06 17:29:52,318] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,318] {dagbag.py:496} INFO - Filling up the DagBag from /xxx/dag.py
[2021-08-06 17:29:52,465] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
[2021-08-06 17:29:52,467] {logging_mixin.py:104} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
[2021-08-06 17:29:52,501] {scheduler_job.py:642} INFO - DAG(s) dict_keys(['xxxx']) retrieved from /xxx/dag.py
[2021-08-06 17:29:52,551] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,551] {dag.py:1833} INFO - Sync 1 DAGs
[2021-08-06 17:29:52,569] {logging_mixin.py:104} INFO - [2021-08-06 17:29:52,569] {dag.py:2306} INFO - Setting next_dagrun for xxx to None
[2021-08-06 17:29:52,581] {scheduler_job.py:189} INFO - Processing /xxxx/dag.py took 0.267 seconds
[2021-08-06 17:30:03,012] {scheduler_job.py:181} INFO - Started process (PID=1107) to work on /xxx/dag.py
[2021-08-06 17:30:03,014] {scheduler_job.py:632} INFO - Processing file /xxxx/dag.py for tasks to queue

@collinmcnulty
Copy link
Contributor

@ephraimbuddy So the metadata database is seeing the task as failed (as was reported by the executor) so that's what's showing up in the UI, but the scheduler still thinks it's queued, so it never attempts to retry? Am I understanding the behavior correctly?

If so, making sure that #15929 includes the task retrying if it has retries left will be key. Otherwise will the behavior difference even be noticeable to the user?

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy So the metadata database is seeing the task as failed (as was reported by the executor) so that's what's showing up in the UI, but the scheduler still thinks it's queued, so it never attempts to retry? Am I understanding the behavior correctly?

If so, making sure that #15929 includes the task retrying if it has retries left will be key. Otherwise, will the behavior difference even be noticeable to the user?

I think retrying is a lesser evil to getting stuck in queued. That was why I added #15929 which will be released in 2.1.3

The problem is when the executor reports that this task has failed and the scheduler sees it as queued, without #15929 it gets stuck in queued(even in the UI) and at times in up_for_retry(if it has retry) but never run again. It's also failed in some cases as @luqic said. But if it's stuck in queued, the task has to be cleared as @mmazek said above before it'd run again. See also #13542.

So without #15929, the task state would be set in up_for_retry at times as I found out but won't be run again. I still check for other possible solutions and your suggestion is worth trying

@collinmcnulty
Copy link
Contributor

collinmcnulty commented Sep 15, 2021

I can reproduce this issue like this:

Use this dag on 2.1.1:

from datetime import timedelta

from kubernetes.client import models as k8s

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="pending",
    schedule_interval=None,
    start_date=days_ago(2),
) as dag:
    BashOperator(
        task_id="forever_pending",
        bash_command="date; sleep 30; date",
        retries=3,
        retry_delay=timedelta(seconds=30),
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[
                                k8s.V1VolumeMount(mount_path="/foo/", name="vol")
                            ],)],
                    volumes=[
                        k8s.V1Volume(
                            name="vol",
                            persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
                                claim_name="missing"
                            ),)],)),},)

And here is the scheduler log from around the failure

[2021-09-15 17:48:56,352] {scheduler_job.py:873} WARNING - Set 1 task instances to state=failed as their associated DagRun was not in RUNNING state
2021-09-15T17:48:56.134716Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:48:56.134808Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
[2021-09-15 17:48:47,821] {dagrun.py:429} ERROR - Marking run <DagRun pending @ 2021-09-15 17:43:28.990599+00:00: manual__2021-09-15T17:43:28.990599+00:00, externally triggered: True> failed
[2021-09-15 17:48:47,769] {scheduler_job.py:1258} ERROR - Executor reports task instance <TaskInstance: pending.forever_pending 2021-09-15 17:43:28.990599+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-09-15 17:48:47,769] {scheduler_job.py:1265} INFO - Setting task instance <TaskInstance: pending.forever_pending 2021-09-15 17:43:28.990599+00:00 [queued]> state to failed as reported by executor
[2021-09-15 17:48:47,761] {kubernetes_executor.py:549} INFO - Changing state of (TaskInstanceKey(dag_id='pending', task_id='forever_pending', execution_date=datetime.datetime(2021, 9, 15, 17, 43, 28, 990599, tzinfo=tzlocal()), try_number=1), 'failed', 'pendingforeverpending.cc4a625ffe0d4da88709098daba98d87', 'astronomer-magnificent-aurora-4284', '1751732637') to failed
[2021-09-15 17:48:47,761] {scheduler_job.py:1229} INFO - Executor reports execution of pending.forever_pending execution_date=2021-09-15 17:43:28.990599+00:00 exited with status failed for try_number 1
[2021-09-15 17:48:47,759] {kubernetes_executor.py:372} INFO - Attempting to finish pod; pod_id: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87; state: failed; annotations: {'dag_id': 'pending', 'task_id': 'forever_pending', 'execution_date': '2021-09-15T17:43:28.990599+00:00', 'try_number': '1'}
[2021-09-15 17:48:46,695] {kubernetes_executor.py:149} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 had an event of type DELETED
[2021-09-15 17:48:46,695] {kubernetes_executor.py:200} INFO - Event: Failed to start pod pendingforeverpending.cc4a625ffe0d4da88709098daba98d87
[2021-09-15 17:48:46,692] {kubernetes_executor.py:149} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 had an event of type MODIFIED
[2021-09-15 17:48:46,692] {kubernetes_executor.py:203} INFO - Event: pendingforeverpending.cc4a625ffe0d4da88709098daba98d87 Pending
[2021-09-15 17:48:46,676] {kubernetes_executor.py:625} ERROR - Pod "pendingforeverpending.cc4a625ffe0d4da88709098daba98d87" has been pending for longer than 300 seconds.It will be deleted and set to failed.
2021-09-15T17:47:50.966665Z info watchFileEvents: notifying
2021-09-15T17:47:47.079744Z info watchFileEvents: notifying
2021-09-15T17:47:40.966397Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:47:40.966527Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:47:37.079501Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:47:37.079624Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
[2021-09-15 17:47:07,909] {scheduler_job.py:1841} INFO - Resetting orphaned tasks for active dag runs
[2021-09-15 17:47:00,347] {scheduler_job.py:1841} INFO - Resetting orphaned tasks for active dag runs
2021-09-15T17:46:35.978572Z info watchFileEvents: notifying
2021-09-15T17:46:25.978277Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:46:25.978421Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:46:21.074893Z info watchFileEvents: notifying
2021-09-15T17:46:11.074610Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:46:11.074754Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.729675760": MODIFY|ATTRIB
2021-09-15T17:45:11.006936Z info watchFileEvents: notifying
2021-09-15T17:45:01.006688Z info watchFileEvents: "/etc/certs": MODIFY|ATTRIB
2021-09-15T17:45:01.006777Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB
2021-09-15T17:45:01.006787Z info watchFileEvents: "/etc/certs/..2021_09_06_06_43_21.426627327": MODIFY|ATTRIB

kaxil added a commit that referenced this issue Sep 21, 2021
…17819)

When a task fails to start, the executor fails it and its state in
scheduler is queued while its state in executor is failed. Currently
we fail this task without retries to avoid getting stuck.

This PR changes this to only fail the task if the callback cannot be
executed. This ensures the task does not get stuck

closes: #16625

Co-authored-by: Kaxil Naik <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants