Replies: 4 comments 2 replies
-
My original hope was that we could recreate this if the worker container got OOMkilled while it was parsing the dag. My impression was that at this point in time, CeleryExecutor would have said "I got this one" to Airflow, but would not yet have committed to the work fully, such that other instances don't adopt the task if it was then killed. That didn't quite happen (here's a gist with more detail: https://gist.github.com/MatrixManAtYrService/cab3fddf52fd7188599914f4a2257706 ) The worker logs had this error (the WARNING text is from code I used to trigger the OOMkill)
And the task_instance table looked like this:
And the scheduler logs had this error:
So if a single task gets killed because it ran out of memory while the dag was parsing, airflow seems to recover as expected. The tasks are set to "failed" immediately, they don't stay "queued" as described above. Next I'll try it again with more than one task. Perhaps if Task C causes the OOMkill while parsing, the timing will be right for Task B to hit the stuck-queued window, or something like that. |
Beta Was this translation helpful? Give feedback.
-
I've been playing with parameters, trying to catch this. Tried many things, for instance:
Here's the updated code: https://gist.github.com/MatrixManAtYrService/6e90a3b8c7c65b8d8b1deaccc8b6f042 I still can't get it to become "stuck". Under load, the transition from queued to failed takes a bit longer, but never hours. The longest I managed to get it was just over a minute. I imagine that increasing the load on the scheduler would stretch this out a bit longer, but I don't know about stretching it all the way to "stuck queued". I have noticed something about the way the UI reports the duration. This one claims to have taken 31 seconds to fail. That's at least reasonable. This one says it took 23:59:20l. I think it was probably just 00:00:40. I don't know whether this is part of our bug, or a distraction. |
Beta Was this translation helpful? Give feedback.
-
Likely good candidate to an issue. Detailed enough, have some logs etc. It might take time for someone to go deeper why and fix - but at least will be more prominent than discussion |
Beta Was this translation helpful? Give feedback.
-
Linking Issue: #28120 |
Beta Was this translation helpful? Give feedback.
-
It seems what’s happening is the
airflow tasks run <task>
command is failing on the Celery worker:The Celery status is set to failed , but the task in Airflow remains in queued for some arbitrary amount of time (often hours):
Note the
state=queued
andexecutor_state=failed
-- Airflow should be marking the task as failed. When this happens, these tasks also bypassstalled_task_timeout
, because whenupdate_task_state
is called, the celery state isSTARTED
.self._set_celery_pending_task_timeout(key, None)
removes the task from the list of tasks eligible forstalled_task_timeout
, and so these tasks remain in queued indefinitely.Summary of what's happening:
update_task_state
method callsfail()
, which is a method from BaseExecutor.fail
calls CeleryExecutor’schange_state
method.change_state
method calls BaseExecutor’schange_state
method viasuper()
change_state
method is as follows:Because the
airflow tasks run
command failed, the task is never set to the running state. Theexcept KeyError
block allows the code to continue unabated. Once BaseExecutor’schange_state
method completes, CeleryExecutor’schange_state
method completes:self._set_celery_pending_task_timeout(key, None)
removes the task from the list of tasks thatstalled_task_timeout
checks for, allowing the tasks to remain in queued indefinitely.Instead, when the
airflow tasks run
command fails, the Airflow task instance should be failed or retried (if applicable).Beta Was this translation helpful? Give feedback.
All reactions