-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout #30108
Conversation
For backward compatibility and semver reasons, would making the tasks go to failed be considered a breaking change? I know that behavior was different than the comparable k8s executor behavior and seems a little askew from expectations so it could be considered a bug I guess? And this does cover that case plus more, so that's good. |
Would appreciate your feedback @okayhooni & @repl-chris ! |
airflow/jobs/scheduler_job.py
Outdated
@@ -853,6 +858,10 @@ def _run_scheduler_loop(self) -> None: | |||
# Check on start up, then every configured interval | |||
self.adopt_or_reset_orphaned_tasks() | |||
|
|||
if self._task_queued_timeout: | |||
self._fail_tasks_stuck_in_queued() | |||
timers.call_regular_interval(self._task_queued_timeout, self._fail_tasks_stuck_in_queued) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need both of these lines? Don't you only want the one wrapped in timers.call_regular_interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the precedent set by adopt_or_reset_orphaned_tasks
(line 858), which runs on scheduler start up and then at a regular interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably doesn't need to run at startup. Adoption makes sense to do because it's pretty likely that another scheduler just shut down if we have a new one starting, but I don't we have a similar situation here.
@@ -1037,6 +1037,7 @@ task_track_started = True | |||
# :ref:`task duration<ui:task-duration>` from the task's :ref:`landing time<ui:landing-times>`. | |||
task_adoption_timeout = 600 | |||
|
|||
# Deprecated. Use scheduler.task_queued_timeout instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this deprecation reflected in the code base by a DeprecationWarning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to include DeprecationWarning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove it if no longer needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would removing it unnecessarily break backward compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
airflow/jobs/scheduler_job.py
Outdated
@@ -1408,6 +1416,40 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> None: | |||
) | |||
self.executor.send_callback(request) | |||
|
|||
@provide_session | |||
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would affect all executors, not just celery and we have some other settings in kubernetes for pending tasks etc. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this is a general problem that applies to both kubernetes & celery executors. The relevant k8s exec setting is worker_pods_queued_check_interval -- I think this can also be handled in the scheduler. I also think this can probably replace task-adoption-timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you agree, i'll remove those configurations as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker_pods_queued_check_interval is similar, but different in that it won't automatically just reset the TI. It first checks to see if the pod exists.
worker_pods_pending_timeout is essentially this same process though. It should probably be deprecated as well (though, not sure how config handles many -> one).
airflow/jobs/scheduler_job.py
Outdated
self._task_queued_timeout = conf.getfloat( | ||
"scheduler", | ||
"task_queued_timeout", | ||
fallback=stalled_task_timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to do this manually, Airflow does it for you when you marked it as a deprecated option.
airflow/jobs/scheduler_job.py
Outdated
@@ -1408,6 +1416,40 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> None: | |||
) | |||
self.executor.send_callback(request) | |||
|
|||
@provide_session | |||
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker_pods_queued_check_interval is similar, but different in that it won't automatically just reset the TI. It first checks to see if the pod exists.
worker_pods_pending_timeout is essentially this same process though. It should probably be deprecated as well (though, not sure how config handles many -> one).
airflow/jobs/scheduler_job.py
Outdated
@@ -853,6 +858,10 @@ def _run_scheduler_loop(self) -> None: | |||
# Check on start up, then every configured interval | |||
self.adopt_or_reset_orphaned_tasks() | |||
|
|||
if self._task_queued_timeout: | |||
self._fail_tasks_stuck_in_queued() | |||
timers.call_regular_interval(self._task_queued_timeout, self._fail_tasks_stuck_in_queued) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably doesn't need to run at startup. Adoption makes sense to do because it's pretty likely that another scheduler just shut down if we have a new one starting, but I don't we have a similar situation here.
airflow/jobs/scheduler_job.py
Outdated
queued for longer than `self._task_queued_timeout` as failed. If the task has | ||
available retries, it will be retried. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any handling of retry state in this code, we go straight to failed and bypass retry logic (unless I've forgotten how that works?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be sufficient to call TI.handle_failure()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it should be based on my read of TI.handle_failure
.
I'm unclear on why task adoption was removed. It covers a whole class of problem, running tasks whose scheduler died, that doesn't seem to be otherwise affected by this PR. |
Accidentally closed this and nuked my changes. I'll open a new PR or re-open this one. |
closes: #28120
closes: #21225
closes: #28943
Tasks occasionally get stuck in queued and aren't resolved by
stalled_task_timeout
(#28120). This PR moves the logic for handling stalled tasks to the scheduler and simplifies the logic by marking any task that has been queued for more thanscheduler.task_queued_timeout
as failed, allowing it to be retried if the task has available retries.This doesn't require an additional scheduler nor allow for the possibility of tasks to get stuck in an infinite loop of scheduled -> queued -> scheduled ... -> queued as exists in #28943.