Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task stuck in "scheduled" or "queued" state, pool has all slots queued, nothing is executing #13542

Closed
Overbryd opened this issue Jan 7, 2021 · 145 comments
Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release

Comments

@Overbryd
Copy link

Overbryd commented Jan 7, 2021

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version):

Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.3", GitCommit:"1e11e4a2108024935ecfcb2912226cedeafd99df", GitTreeState:"clean", BuildDate:"2020-10-14T12:50:19Z", GoVersion:"go1.15.2", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"17+", GitVersion:"v1.17.14-gke.1600", GitCommit:"7c407f5cc8632f9af5a2657f220963aa7f1c46e7", GitTreeState:"clean", BuildDate:"2020-12-07T09:22:27Z", GoVersion:"go1.13.15b4", Compiler:"gc", Platform:"linux/amd64"}

Environment:

  • Cloud provider or hardware configuration: GKE
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:
    • Airflow metadata database is hooked up to a PostgreSQL instance

What happened:

  • Airflow 2.0.0 running on the KubernetesExecutor has many tasks stuck in "scheduled" or "queued" state which never get resolved.
  • The setup has a default_pool of 16 slots.
  • Currently no slots are used (see Screenshot), but all slots are queued.
  • No work is executed any more. The Executor or Scheduler is stuck.
  • There are many many tasks stuck in "scheduled" state
    • Tasks in "scheduled" state say ('Not scheduling since there are %s open slots in pool %s and require %s pool slots', 0, 'default_pool', 1)
      That is simply not true, because there is nothing running on the cluster and there are always 16 tasks stuck in "queued".
  • There are many tasks stuck in "queued" state
    • Tasks in "queued" state say Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
      That is also not true. Nothing is running on the cluster and Airflow is likely just lying to itself. It seems the KubernetesExecutor and the scheduler easily go out of sync.

What you expected to happen:

  • Airflow should resolve scheduled or queued tasks by itself once the pool has available slots
  • Airflow should use all available slots in the pool
  • It should be possible to clear a couple hundred tasks and expect the system to stay consistent

How to reproduce it:

  • Vanilla Airflow 2.0.0 with KubernetesExecutor on Python 3.7.9

  • requirements.txt

    pyodbc==4.0.30
    pycryptodomex==3.9.9
    apache-airflow-providers-google==1.0.0
    apache-airflow-providers-odbc==1.0.0
    apache-airflow-providers-postgres==1.0.0
    apache-airflow-providers-cncf-kubernetes==1.0.0
    apache-airflow-providers-sftp==1.0.0
    apache-airflow-providers-ssh==1.0.0
    
  • The only reliable way to trigger that weird bug is to clear the task state of many tasks at once. (> 300 tasks)

Anything else we need to know:

Don't know, as always I am happy to help debug this problem.
The scheduler/executer seems to go out of sync and never back in sync again with the state of the world.

We actually planned to upscale our Airflow installation with many more simultaneous tasks. With these severe yet basic scheduling/queuing problems we cannot move forward at all.

Another strange, likely unrelated observation, the scheduler always uses 100% of the CPU. Burning it. Even with no scheduled or now queued tasks, its always very very busy.

Workaround:

The only workaround for this problem I could find so far, is to manually go in, find all tasks in "queued" state and clear them all at once. Without that, the whole cluster/Airflow just stays stuck like it is.

@Overbryd Overbryd added the kind:bug This is a clearly a bug label Jan 7, 2021
@Overbryd Overbryd changed the title Task stuck in "scheduled" or "queued" state, pool has open slots Task stuck in "scheduled" or "queued" state, pool has all slots queued, nothing is executing Jan 7, 2021
@potiuk potiuk added this to the Airflow 2.0.1 milestone Jan 7, 2021
@potiuk potiuk added the priority:medium Bug that should be fixed before next release but would not block a release label Jan 7, 2021
@mfjackson
Copy link
Contributor

@Overbryd I'm not sure if this helps, but I was dealing with a similar issue with our self-managed Airflow instance on GKE when we upgraded to 2.0.0 a couple of weeks ago. Are you using gitSync? If so, are you receiving an error in your logs surrounding the knownHosts?

If so, we found that updating the pod-template-file to this:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
{{- if .Values.dags.gitSync.enabled }}
  initContainers:
{{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | indent 8 }}
{{- end }}
  containers:
    - args: []
      command: []
      envFrom:
      {{- include "custom_airflow_environment_from" . | default "\n  []" | indent 6 }}
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
{{- include "standard_airflow_environment" . | indent 6}}
{{- include "custom_airflow_environment" . | indent 6 }}
      image: {{ template "pod_template_image" . }}
      imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
      name: base
      ports: []
      volumeMounts:
        - mountPath: {{ template "airflow_logs" . }}
          name: airflow-logs
        - name: config
          mountPath: {{ template "airflow_config_path" . }}
          subPath: airflow.cfg
          readOnly: true
{{- if .Values.scheduler.airflowLocalSettings }}
        - name: config
          mountPath: {{ template "airflow_local_setting_path" . }}
          subPath: airflow_local_settings.py
          readOnly: true
{{- end }}
{{- if .Values.dags.gitSync.knownHosts }}
        - mountPath: /etc/git-secret/known_hosts
          name: config
          subPath: known_hosts
          readOnly: true
{{- end }}
{{- if .Values.dags.gitSync.sshKeySecret }}
        - mountPath: /etc/git-secret/ssh
          name: git-sync-ssh-key
          subPath: ssh
{{- end }}
{{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
        - mountPath: {{ include "airflow_dags_mount_path" . }}
          name: dags
          readOnly: true
{{- if .Values.dags.persistence.enabled }}
          subPath: {{.Values.dags.gitSync.dest }}/{{ .Values.dags.gitSync.subPath }}
{{- end }}
{{- end }}
  hostNetwork: false
  {{- if or .Values.registry.secretName .Values.registry.connection }}
  imagePullSecrets:
    - name: {{ template "registry_secret" . }}
  {{- end }}
  restartPolicy: Never
  securityContext:
    runAsUser: {{ .Values.uid }}
  nodeSelector: {{ toYaml .Values.nodeSelector | nindent 4 }}
  affinity: {{ toYaml .Values.affinity | nindent 4 }}
  tolerations: {{ toYaml .Values.tolerations | nindent 4 }}
  serviceAccountName: '{{ .Release.Name }}-worker'
  volumes:
  {{- if .Values.dags.persistence.enabled }}
  - name: dags
    persistentVolumeClaim:
      claimName: {{ template "airflow_dags_volume_claim" . }}
  {{- else if .Values.dags.gitSync.enabled }}
  - name: dags
    emptyDir: {}
  {{- end }}
  {{- if and  .Values.dags.gitSync.enabled  .Values.dags.gitSync.sshKeySecret }}
{{- include "git_sync_ssh_key_volume" . | indent 2 }}
  {{- end }}
  - emptyDir: {}
    name: airflow-logs
  - configMap:
      name: {{ include "airflow_config" . }}
    name: config

The changes to the original file are as follows:

  • Line 56: set "name: config"
  • Line 58: insert "readOnly: true"
  • Delete lines 98-103

This allowed gitSync to work (after also passing a knownHosts value in the values.yaml file, along with the other necessary configurations) and processes to move from a persistent "queued" state to a "running" state.

If this isn't the issue, the other area that you may want to look into is making sure that your service account binding annotations are properly set for you scheduler, webserver, and workers in your values.yaml file. If they're not properly bound to a GCP service account with adequate pod creation permissions, the initial task may queue, but won't run.

@vikramkoka vikramkoka added provider:cncf-kubernetes Kubernetes provider related issues affected_version:2.0 Issues Reported for 2.0 labels Jan 14, 2021
@kaxil
Copy link
Member

kaxil commented Jan 18, 2021

@Overbryd Did the suggestion in above comment help?

@kaxil kaxil removed this from the Airflow 2.0.1 milestone Jan 25, 2021
@jonathonbattista
Copy link

jonathonbattista commented Jan 31, 2021

We are experiencing these symptoms on 1.10.14 as well. Doesn't seem git-sync related.

120 tasks are supposedly running with0 slots are open, but nothing is spawning.

Does any work have a clue why this is happening?

@Overbryd
Copy link
Author

Overbryd commented Feb 7, 2021

@kaxil first of all, sorry for my late reply. I am still active on this issue, just so you know. I have been quite busy unfortunately.

You asked whether this might be git-sync related.
I can state that it is not git-sync related.
I observed this issue with local clusters (Dags loaded using Docker volume mounts) as well as with installations that pack the code into the container.

Its a bit hard to track down precisely, and I could only ever see it when using KubernetesExecutor.
If I ever observe it again, I'll try to reproduce it more precisely (currently that cluster is running a low volume of jobs regularly, therefore all is smooth sailing so far).

The only way I could trigger it manually once (as state in my original post) was when I was clearing a large number of tasks at once.

@pageldev
Copy link
Contributor

pageldev commented Feb 8, 2021

I'm seeing this behaviour as well. I could not reliably reproduce it, but my experience matches that of @Overbryd - especially when clearing many tasks that can run in parallel. I noticed that the indefinitely queued tasks produce an error in the scheduler log:

[2021-01-20 17:17:52,393] {base_executor.py:82} ERROR - could not queue task ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2)

Looking further at the log I notice that, the task gets processed normally first, but then gets picked up again leading to the mentioned error.

[2021-01-20 17:16:18,454] {scheduler_job.py:1100} INFO - Sending ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2) to executor with priority 1 and queue celery
[2021-01-20 17:16:18,454] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', '==dag-id==', '==task-id==', '2021-01-12T17:39:37.661475+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/dag.py']
[2021-01-20 17:16:18,469] {kubernetes_executor.py:532} INFO - Add task ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2) with command [] with executor_config {}
[2021-01-20 17:16:18,498] {kubernetes_executor.py:299} INFO - Kubernetes job is (==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2)
[2021-01-20 17:16:25,158] {kubernetes_executor.py:568} INFO - Changing state of (==TaskInstanceKey== tzinfo=tzutc()), try_number=2), None, '==pod-id==', 'airflow', '8928202') to None
[2021-01-20 17:16:25,192] {kubernetes_executor.py:617} INFO - Deleted pod: ==TaskInstanceKey== tzinfo=tzutc()), try_number=2) in namespace airflow
[2021-01-20 17:16:25,357] {kubernetes_executor.py:568} INFO - Changing state of (==TaskInstanceKey== tzinfo=tzutc()), try_number=2), None, '==pod-id==', 'airflow', '8928203') to None
[2021-01-20 17:16:25,363] {kubernetes_executor.py:617} INFO - Deleted pod: ==TaskInstanceKey== tzinfo=tzutc()), try_number=2) in namespace airflow
[2021-01-20 17:16:25,364] {kubernetes_executor.py:568} INFO - Changing state of (==TaskInstanceKey== tzinfo=tzutc()), try_number=2), None, '==pod-id==', 'airflow', '8928204') to None
[2021-01-20 17:16:25,369] {kubernetes_executor.py:617} INFO - Deleted pod: ==TaskInstanceKey== tzinfo=tzutc()), try_number=2) in namespace airflow
# ⬇ task gets picked up again
[2021-01-20 17:17:30,649] {scheduler_job.py:1100} INFO - Sending ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2) to executor with priority 1 and queue celery
[2021-01-20 17:17:30,649] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', '==dag-id==', '==task-id==', '2021-01-12T17:39:37.661475+00:00', '--local', '--pool', 'default_pool', '--subdir', '/opt/airflow/dags/dag.py']
[2021-01-20 17:17:30,655] {kubernetes_executor.py:532} INFO - Add task ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2) with command 
[2021-01-20 17:17:30,658] {kubernetes_executor.py:299} INFO - Kubernetes job is (==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2), 
[2021-01-20 17:17:52,389] {scheduler_job.py:1058} INFO - Setting the following tasks to queued state:
    <TaskInstance: ==TaskInstanceKey== 2021-01-12 17:39:37.661475+00:00 [scheduled]>
[2021-01-20 17:17:52,392] {scheduler_job.py:1100} INFO - Sending ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2) to executor with priority 1 and queue celery
[2021-01-20 17:17:52,393] {base_executor.py:82} ERROR - could not queue task ==TaskInstanceKey== tzinfo=Timezone('UTC')), try_number=2)

@nikitarulz
Copy link

This is happening with Celery executor as well.
I'm using Airflow 2.0.0 with Celery executor and mysql, facing similar issue.
Sorry for the basic question but I'm unable to figure-out the manual way to find all tasks in "queued" state and clearing them.
Can somebody help here.

@jonathonbattista
Copy link

This is probably not helpful because you are on 2.x but our solution was to set AIRFLOW__SCHEDULER__RUN_DURATION which will restart the scheduler every x hours.

You could probably achieve something similar tho.

@renanleme
Copy link

renanleme commented Mar 21, 2021

I had the same problem today and I think I found the problem.

I'm testing with:
Apache Airflow version: 2.0.1
Executor: Celery
Running locally

I was testing one dag and after changing a few parameters in one of the tasks in the dag file and cleaning the tasks, the task got stuck on scheduled state.
The problem: The changes that I made broke the task, something like this:

airflow-worker_1     | airflow.exceptions.AirflowException: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator (task_id: load_dag). Invalid arguments were:
airflow-worker_1     | **kwargs: {'gcp_conn_id': 'bigquery_default'}

So, the worker was refusing to execute because I was passing an invalid argument to the task. The problem is that the worker doesn't notify (or update the task status to running) the scheduler/web that the file is wrong (no alert of a broken dag was being show in the Airflow home page).

After updating the task parameter and cleaning the task, it ran successfully.

Ps.: Probably is not the same problem that the OP is having but it's related to task stuck on scheduled

@RicardoPedrotti
Copy link

I'm facing the same issue as OP and unfortunately what @renanleme said does not apply to my situation.
I read a lot about the AIRFLOW__SCHEDULER__RUN_DURATION that @jonathonbattista mentioned to be the solution, but first of all, this config does not appear to be implemented in airflow 2.X and second, as the Scheduler documentation says:

scheduler is designed to run as a persistent service in an Airflow production environment.

I wouldn't mind restarting the scheduler, but it is not clear for me the reason of the hanging queued tasks. In my environment, it appears to be very random.

@Overbryd
Copy link
Author

Overbryd commented Mar 31, 2021

Back on this. I am currently observing the behaviour again.

I can confirm:

  • The solution description of @renanleme does not apply to my case. Definitely not.
  • Restarting the scheduler is a workaround to the problem

The issue persists with 2.0.1 please update the tag accordingly.

The issue is definitely "critical" as it halts THE ENTIRE airflow operation...!

@uranusjr
Copy link
Member

uranusjr commented Mar 31, 2021

Is this related to #14924?

@kaxil kaxil added this to the Airflow 2.0.3 milestone Mar 31, 2021
@ephraimbuddy
Copy link
Contributor

I have replicated this, will be working on it

@ephraimbuddy ephraimbuddy self-assigned this Apr 1, 2021
@ephraimbuddy
Copy link
Contributor

Unassigning myself as I can't reproduce the bug again.

@ephraimbuddy ephraimbuddy removed their assignment Apr 1, 2021
@SalmonTimo
Copy link

SalmonTimo commented Apr 2, 2021

I ran into this issue due to the scheduler over-utilizing CPU because our min_file_process_interval was set to 0 (the default prior to 2.0), which in airflow 2.0 causes 100% CPU utilization by constantly searching for new DAG files. Setting this parameter to 60 fixed the issue.

The stack I observed this on:
host: AWS ECS Cluster
executor: CeleryExecutor
queue: AWS SQS Queue

The behavior I observed was that the scheduler would mark tasks are "queued", but never actually send them to the queue. I think the scheduler does the actual queueing via the executor, so I suspect that the executor is starved of resources and unable to queue the new task. My manual workaround until correcting the min_file_process_interval param was to stop the scheduler, clear queued tasks, and then start a new scheduler. The new scheduler would temporarily properly send tasks to the queue, before degenerating to marking tasks as queued without sending to the queue.

I suspect the OP may have this same issue because they mentioned having 100% CPU utilization on their scheduler.

@kaxil
Copy link
Member

kaxil commented Apr 2, 2021

We have changed the default min_file_process_interval to 30 from Airflow 2.0.1

https://github.com/apache/airflow/blob/8cc8d11fb87d0ad5b3b80907874f695a77533bfa/UPDATING.md#default-scheduler-min_file_process_interval-is-changed-to-30

For existing deployment, a user will have to change this manually in their airflow.cfg

although I wouldn't think that might be the cause to task staying in the queued state but worth a try

@girishrd2004
Copy link

I had the task stuck because I changed the DAG id as part of my code change. After reverting the DAG id change, things worked fine. However the old queued tasks had to be manually marked complete.

@lukas-at-harren
Copy link

lukas-at-harren commented Apr 6, 2021

@kaxil I have checked, min_file_process_interval is set to 30, however the problem is still there for me.

@SalmonTimo I have a pretty high CPU utilisation (60%), albeit the scheduler settings are default. But why? Does this matter?

––

Same issue, new day: I have Airflow running, the scheduler running, but the whole cluster has 103 scheduled tasks and 3 queued tasks, but nothing is running at all. I highly doubt that min_file_process_interval is the root of the problem.
I suggest somebody mark that issue with a higher priority, I do not think that "regularly restarting the scheduler" is a reasonable solution.

--

What we need here is some factual inspection of the Python process.
I am no Python expert, however I am proficient and know myself around in other VMs (Erlang, Ruby).

Following that stack trace idea, I just learned that Python cannot dump a process (https://stackoverflow.com/a/141826/128351), unfortunately, otherwise I would have provided you with such a process dump of my running "scheduler".

I am very happy to provide you with some facts about my stalled scheduler, if you tell me how you would debug such an issue.

What I currently have:

  • CPU utilisation of the scheduler is still pretty high (around 60%).
  • AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL is set to 30
  • AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL is set to 10
  • Log output of scheduler:
[2021-04-06 05:19:56,201] {scheduler_job.py:1063} INFO - Setting the following tasks to queued state:

[2021-04-06 05:19:57,865] {scheduler_job.py:941} INFO - 15 tasks up for execution:

# ... snip ...

[2021-04-06 05:19:57,876] {scheduler_job.py:975} INFO - Figuring out tasks to run in Pool(name=mssql_dwh) with 0 open slots and 15 task instances ready to be queued
[2021-04-06 05:19:57,882] {scheduler_job.py:985} INFO - Not scheduling since there are 0 open slots in pool mssql_dwh

What I find striking, is the message INFO - Not scheduling since there are 0 open slots in pool mssql_dwh.
That is a pool configured for max 3 slots. However no single task is running. I fear bug is, that the "scheduler" might be loosing track of running tasks on Kubernetes. Bluntly, I guess there is a bug in the components:

  • Scheduler
  • KubernetesExecutor
  • Pools

@kaxil
Copy link
Member

kaxil commented Apr 6, 2021

@lukas-at-harren -- Can you check the Airflow Webserver -> Admin -> Pools and then in the row with your pool (mssql_dwh) check the Used slots. And click on the number in Used slots, it should take you to the TaskInstance page that should show the currently "running" taskinstances in that Pool.

It is possible that they are not actually running but somehow got in that state in DB. If you see 3 entries over here, please mark those tasks as success or failed, that should clear your pool.

image

image

@danmactough
Copy link
Contributor

Thre is also 2.2.2 as of recently. Can you please upgrade and check it there @danmactough ?

Wow @potiuk I totally missed that update! Huge news! I'll check that out and see if it helps.

@WattsInABox
Copy link

@danmactough Did this update help? We're tracking this issue before upgrading.

@danmactough
Copy link
Contributor

@danmactough Did this update help? We're tracking this issue before upgrading.

@WattsInABox Unfortunately, the AWS MWAA "upgrade" path is not an upgrade path, but rather a "stand up a new cluster and move all your DAGs with no way to retain historical Airflow metadata" path. Which ok fine, maybe that's what we should do in any case, but it does mean testing out 2.2.2 is going to require some planning.

@WattsInABox
Copy link

@danmactough ouch! Thanks for responding to a rando, we appreciate it :)

@DVerzal
Copy link

DVerzal commented Feb 23, 2022

We've upgraded to a 2.2.2 MWAA environment and are encountering the similar queuing behavior. Tasks remain in the queued state for about fifteen minutes before executing. This is in an extremely small dev environment we're testing. Unfortunately, even the unstick_tag task remains in a queued state.

@potiuk
Copy link
Member

potiuk commented Feb 27, 2022

We've upgraded to a 2.2.2 MWAA environment and are encountering the similar queuing behavior. Tasks remain in the queued state for about fifteen minutes before executing. This is in an extremely small dev environment we're testing. Unfortunately, even the unstick_tag task remains in a queued state.

@DVerzal I propose you followhttps://airflow.apache.org/docs/apache-airflow/stable/faq.html?highlight=faq#why-is-task-not-getting-scheduled - and review resourcesa and configuration of your Airlfow and open a new issue (if this does not help) with detailed information on your configuraiton and logs. It is likely different problem than what you describe.

@potiuk
Copy link
Member

potiuk commented Feb 27, 2022

Also I suggest to open issue to the MWAA support - maybe this is simply some problem with MWAA configuration.

@DVerzal
Copy link

DVerzal commented Mar 1, 2022

Thanks for pointing me in the right direction, @potiuk. We're planning to continue with our investigation when some resources free up to continue the migration.

@haninp
Copy link

haninp commented Mar 7, 2022

Hello, found same issue when i used ver 2.2.4 (latest)
maybe we have some workaround for this things ?

@potiuk
Copy link
Member

potiuk commented Mar 7, 2022

Hello, found same issue when i used ver 2.2.4 (latest) maybe we have some workaround for this things ?

@haninp - this might be (and likely is - because MWAA which plays a role here has no 2.2.4 support yet) completely different issue. It's not helpful to say "I also have similar problem" without specifying details, logs .

As a "workaround" (or diagnosis) I suggest you to follow this FAQ here: https://airflow.apache.org/docs/apache-airflow/stable/faq.html?highlight=faq#why-is-task-not-getting-scheduled and double check if your problem is not one of those with the configuration that is explained there.

If you find you stil have a problem, then I invite you to describe it in detail in a separate issue (if this is something that is easily reproducible) or GitHub Discussion (if you have a problem but unsure how to reproduce it). Providing as many details such as your deployment details, logs, circumstances etc. are crucial to be able to help you. Just stating "I also have this problem" helps no-one (including yourself because you might thiink you delegated the problem and it will be solved, but in fact this might be a completely different problem.

@tothandor
Copy link

Hi All! With release 2.2.5 scheduling issues have gone away for me.
I guess one of the following resolved issues made it.

  1. Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
  2. Fix race condition between triggerer and scheduler (#21316)

I am still using mostly SubDags instead of TaskGroups, since the latter makes the tree view incomprehensible. If you have a similar setup, then give 2.2.5 release a try!

@victor-mariano-leite
Copy link

victor-mariano-leite commented Apr 13, 2022

For those having problems with MWAA, I had this error today, and couldn't wait for 2.2.5 release in MWAA to finish my company migration project, so since we have 17 DAGs in my company, with 8-9 steps as median of tasks (one have 100+ tasks, for each table in our DB, runs a import/CDC and validation task), being all those 17 running once a day at night.

I went a bit extreme with reducing the load on the scheduler, and looks like it's working properly (for our current use cases, scale) after a few tests today.

If anyone want to experiment and are having the same problem with similar settings, here's the configurations i've changed, using mw1.medium:

Configuration Value
celery.sync_parallelism 1
celery.worker_autoscale 5,5
core.min_serialized_dag_fetch_interval 20
core.min_serialized_dag_update_interval 60
scheduler.catchup_by_default False
scheduler.job_heartbeat_sec 10
scheduler.max_dagruns_per_loop_to_schedule 10
scheduler.max_dagruns_to_create_per_loop 5
scheduler.max_tis_per_query 128
scheduler.min_file_process_interval 60
scheduler.parsing_processes 1
scheduler.scheduler_idle_sleep_time 5

@ktleung2017
Copy link

#21455 (comment)
The fix here solves our stuck in "scheduled" problem. If you are using 2.2.2, you may give it a try.

@mwoods-familiaris
Copy link

mwoods-familiaris commented May 6, 2022

Also experiencing this issue on MWAA 2.2.2. Seeing the same pattern as another commenter, in that when a DAG gets "stuck" with the first task in a queued state, it takes 15 minutes to sort itself out.

Our MWAA 2.0.2 instances never exhibited this behavior.

Has anyone had any luck in finding a workaround/fix suitable for an MWAA 2.2.2 mw1.small instance (i.e. something that doesn't involve upgrading to a later Airflow version)?

UPDATE: for anyone using MWAA v2.2.2 who is experiencing the issue of tasks being stuck in a "queued" state for 15 minutes even when the worker pool has no tasks being executed, what has worked for us is to set the "celery.pool" configuration option to "solo". This resolved the issue for us immediately, though may have some knock-on impact in terms of worker throughput, so you may need to scale workers accordingly in some situations.

@tal181
Copy link
Contributor

tal181 commented May 19, 2022

From Airflow doc :
Re-run Tasks
Some of the tasks can fail during the scheduled run. Once you have fixed the errors after going through the logs, you can re-run the tasks by clearing them for the scheduled date. Clearing a task instance doesn't delete the task instance record. Instead, it updates max_tries to 0 and sets the current task instance state to None, which causes the task to re-run.
Click on the failed task in the Tree or Graph views and then click on Clear. The executor will re-run it.

Taking @val2k script and changing the max_tries to 0 & state to None fixed the script for us

import os
import requests
import time
import json
from datetime import datetime, timedelta
from pprint import pprint
import logging
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.state import State

from airflow.models.taskinstance import clear_task_instances
from sqlalchemy.sql.expression import or_

@provide_session
def unstick_dag_callable(session, **kwargs):
    filter = [
        or_(TaskInstance.state == State.QUEUED, TaskInstance.state == State.NONE),
        TaskInstance.queued_dttm < datetime.now(timezone.utc) - timedelta(hours=2)
    ]

    tis = session.query(TaskInstance).filter(*filter).all()
    logging.info(f"Task instances: {tis}")
    logging.info(f"Updating {len(tis)} task instances")

    task_ids = []
    for ti in tis:

        task_ids.append(f"dag_id : {ti.dag_id},task_id : {ti.task_id}, execution_date : {ti.execution_date}")

        try:
            dr = (
                session.query(DagRun)
                    .filter(DagRun.run_id == ti.dag_run.run_id)
                    .first()
            )

            dagrun = {}

            if dr:
                dagrun = dict(
                    id=dr.id,
                    dag_id=dr.dag_id,
                    execution_date=dr.execution_date,
                    start_date=dr.start_date,
                    end_date=dr.end_date,
                    _state=dr._state,
                    run_id=dr.run_id,
                    creating_job_id=dr.creating_job_id,
                    external_trigger=dr.external_trigger,
                    run_type=dr.run_type,
                    conf=dr.conf,
                    last_scheduling_decision=dr.last_scheduling_decision,
                    dag_hash=dr.dag_hash,
                )

            logging.info(
                dict(
                    task_id=ti.task_id,
                    job_id=ti.job_id,
                    key=ti.key,
                    dag_id=ti.dag_id,
                    execution_date=ti.execution_date,
                    state=ti.state,
                    dag_run={**dagrun},
                )
            )



            ti.max_tries = 0
            ti.state = None
            session.merge(ti)
        except Exception as e:
            logging.error("Failed to clear task reason : " + str(e))
    session.commit()


    logging.info("Done.")


def clear_task(session, ti):
    clear_task_instances(tis=[ti],
                         session=session,
                         activate_dag_runs=True,
                         dag=None)


with DAG(
    "retry_dag",
    description="Utility DAG to fix TaskInstances stuck in queued state",
    default_args=WORKFLOW_ARGS,
    schedule_interval="*/10 * * * *",
    start_date=datetime(year=2021, month=8, day=1),
    max_active_runs=1,
    catchup=False,
    is_paused_upon_creation=True,
) as dag:
    PythonOperator(task_id="unstick_dag", python_callable=unstick_dag_callable)

@mattvonrocketstein
Copy link

Question for other MWAA users.. have you guys tried setting max-workers==min-workers, basically disabling autoscaling? Is anyone without autoscaling actually seeing this stuff, regardless of airflow version?

We've also talked to the MWAA team, and haven't heard clear answers about whether messages/workers are properly drained when down-scaling, so I'm wondering if that's not the crux of this issue, basically where queue state becomes inconsistent due to weird race conditions with improper worker shutdown. As the MWAA backend is pretty opaque to end-users, it's possible that downscaling is nothing more complicated or careful than just terminating an EC2 worker, or fargate pod, or whatever. However, IDK much about airflow/celery internals as far as redelivery, dead-letter queues, etc, so I might be way off base here.

Since this is something that arguably could/should be fixed in a few different places (the MWAA core infrastructure, or the celery codebase, or the airflow codebase).. it seems likely that the problem may stick around for a while as well as the confusion about what versions are affected. The utility DAGs in this thread are an awesome reference ❤️ , and it may come to that but still hoping for a different work-around. Airflow version-upgrades or something would also leave us with a big stack of things to migrate and we can't jump into that immediately. Without autoscaling we can expect things to get more expensive, but we're thinking maybe it's worth it at this point to buy more stability. Anyone got more info?

@Gelerion
Copy link

@mattvonrocketstein
I can confirm the issue is still present with autoscaling disabled.

In our case, we dynamically create dags, so the MWAA team's first suggestion was to reduce the load on the scheduler by increasing the refresh dags interval. It seems to help. We see fewer errors in the logs, and tasks getting stuck less often, but it didn't resolve the issue.

Now we are waiting for the second round of suggestions.

@dashton90
Copy link
Contributor

Specific to MWAA, our team had a similar issue. We found this to be an issue with the MWAA default value of celery.worker_autoscale. The value varies by environment class, but is set to 5 * vCPU per worker. This overloads the workers, and causes resources to lock.

Workers have a different number of vCPUs per environment. Defaults are:

Environment Class Worker CPU Default Value
mw1.small 1 vCPU celery.worker_autoscale=5,5
mw1.medium 2 vCPU celery.worker_autoscale=10,10
mw1.large 4 vCPU celery.worker_autoscale=20,20

Our resolution was to lower the value of celery.worker_autoscale to 2 * vCPU per worker

A manageable value per environment class is

Environment Class Parameter
mw1.small celery.worker_autoscale=2,2
mw1.medium celery.worker_autoscale=4,4
mw1.large celery.worker_autoscale=8,8

@potiuk potiuk removed this from the Airflow 2.3.3 milestone Jun 4, 2022
@apache apache locked and limited conversation to collaborators Jun 4, 2022
@potiuk potiuk converted this issue into discussion #24180 Jun 4, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug priority:high High priority bug that should be patched quickly but does not require immediate new release
Projects
None yet
Development

Successfully merging a pull request may close this issue.