-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add handling logic on CeleryExecutor to reschedule task stuck in queued status #28943
Add handling logic on CeleryExecutor to reschedule task stuck in queued status #28943
Conversation
if self.task_stuck_in_queued_check_interval: | ||
self.event_scheduler = EventScheduler() | ||
|
||
self.event_scheduler.call_regular_interval( | ||
self.task_stuck_in_queued_check_interval, | ||
self.clear_not_launched_queued_tasks, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think we need an extra scheduler for this, but can probably check during a scheduler loop iteration. Probably during _critical_section_enqueue_task_instances
to avoid race conditions (a scheduler tries to schedule a task that’s cleared in another).
Doesn't the |
|
@okayhooni could you share your DAG that restarts tasks? Unfortunately, we face the same issue. |
@Tonkonozhenko from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional
import pytz
from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State
from sqlalchemy.orm import Session
DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA = timedelta(minutes=30)
DEFAULT_ZOMBIE_MARKING_STATE = State.FAILED
DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY = "P3"
class ZombieTaskKillerOperator(BaseOperator):
"""
Marking zombie tasks stuck on QUEUED state as specified state(default to FAILED)
related issues: https://github.com/apache/airflow/issues?q=is%3Aissue+is%3Aopen+stuck+queued
:param zombie_threshold_timedelta: target threshold timedelta for deciding whether this task is zombie or not
:param zombie_marking_state: marking state of detected queued zombie tasks, default to State.FAILED (= "failed")
:param only_on_current_dag: whether kill tasks only on the current DAG or not, default to True
:param opsgenie_conn_id: Airflow connection ID for alerting with OpsGenie. if not provided, there is no alert
:param opsgenie_priority: The priority of the OpsGenie alert. possible values are: P1, P2, P3, P4, P5, default to P3
"""
def __init__(
self,
*args,
zombie_threshold_timedelta: timedelta = DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA,
zombie_marking_state: State = DEFAULT_ZOMBIE_MARKING_STATE,
only_on_current_dag: bool = True,
opsgenie_conn_id: Optional[str] = None,
opsgenie_priority: str = DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY,
**kwargs,
):
super().__init__(*args, **kwargs)
self.zombie_threshold_timedelta = zombie_threshold_timedelta
self.zombie_marking_state = zombie_marking_state
self.only_on_current_dag = only_on_current_dag
self.opsgenie_conn_id = opsgenie_conn_id
self.opsgenie_priority = opsgenie_priority
def execute(self, context: Dict[str, Any]) -> None:
"""execute method for ZombieTaskKillerOperator"""
queued_zombies = self._find_and_kill_queued_zombies()
if queued_zombies and self.opsgenie_conn_id:
self._alert_queued_zombies(zombie_tasks=queued_zombies, log_url=context["task_instance"].log_url)
@provide_session
def _find_and_kill_queued_zombies(
self, *, session: Session = NEW_SESSION
) -> Optional[List[TaskInstance]]:
"""
find and kill queued zombie with state update on Airflow meta database
:param session: Sqlalchemy ORM Session (provided on default)
:return: list of zombie task instances, returned from sqlalchemy ORM query, if exist
"""
cur_utc_datetime = datetime.now(pytz.utc)
self.log.info(f"[cur_utc_datetime]: {cur_utc_datetime}")
queued_zombie_target_filters = [
TaskInstance.state == State.QUEUED,
TaskInstance.queued_dttm <= cur_utc_datetime - self.zombie_threshold_timedelta,
] # queued_dttm field is loaded on metadb with UTC timezone
if self.only_on_current_dag:
queued_zombie_target_filters.append(TaskInstance.dag_id == self.dag_id)
queued_zombies_query = session.query(
TaskInstance.dag_id,
TaskInstance.run_id,
TaskInstance.task_id,
TaskInstance.operator,
TaskInstance.state,
TaskInstance.queued_dttm,
TaskInstance.queue,
TaskInstance.pool,
).filter(*queued_zombie_target_filters)
queued_zombies = queued_zombies_query.all()
if not queued_zombies:
self.log.info("CLEAR! CURRENTLY, THERE IS NO QUEUED ZOMBIE FOUND")
return
self.log.warning(f"[Queued Zombies]: {queued_zombies}")
queued_zombies_query.update(
{TaskInstance.state: self.zombie_marking_state}, synchronize_session="fetch"
)
self.log.warning(f"MARKING QUEUED ZOMBIES TO `{self.zombie_marking_state}` STATE")
return queued_zombies
def _alert_queued_zombies(self, *, zombie_tasks: List[TaskInstance], log_url: str) -> None:
"""
alert queued zombie tasks marked as a specified state on OpsGenie
:param zombie_tasks: list of zombie task instances, returned from sqlalchemy ORM query
:param log_url: url of this zombie killer task log
"""
zombie_tasks_description = [
dedent(
f"""\
[CAUGHT ZOMBIE - {idx}]
*DAG*: {zombie.dag_id}
*Run ID*: {zombie.run_id}
*Task*: {zombie.task_id}
*Queued Time*: {zombie.queued_dttm}\
"""
)
for idx, zombie in enumerate(zombie_tasks, 1)
]
alert_msg_json = {
"message": f"[Airflow] Marking {len(zombie_tasks)} queued zombie task(s) as `{self.zombie_marking_state}`",
"description": "\n=================================\n".join(zombie_tasks_description),
"details": {"Logs": log_url},
"source": "airflow",
"priority": self.opsgenie_priority,
"tags": ["airflow"],
}
opsgenie_hook = OpsgenieAlertHook(self.opsgenie_conn_id)
opsgenie_hook.create_alert(alert_msg_json) Then, I deployed this custom operator on the DAG like below (15 min interval) zombie_killer = ZombieTaskKillerOperator(
task_id="zombie_killer",
zombie_marking_state=State.SCHEDULED,
zombie_threshold_timedelta=timedelta(minutes=29),
only_on_current_dag=False,
opsgenie_conn_id="ALARM_OPSGENIE_DATAPLATFORM",
opsgenie_priority="P5",
) It's not fancy/graceful solution, but works well. |
Previously, I did set the task state to |
thanks, @okayhooni |
@Tonkonozhenko This PR is particularly useful for rare cases where |
b9f6784
to
835da8a
Compare
This one seems to be able to handle the really often problem that our users seem to have. I do not know that much of the intricacies of our celery integration, but this one seems to add enough of the harness and similar approach that we already use in K8S executor. So maybe those who have more of the celery experience can take a look at it? WDYT @ashb @ephraimbuddy @uranusjr and maybe also @dstandish for similar case in K8S? I think that one might solve some real pains of the users (though they might not address the root cause of the problems observed). |
There some tests to fix though |
Hard to say. I'd prefer those who get a bit more deep understanding of how celery integration works @ashb @ephraimbuddy @uranusjr to chime in. |
If #30108 is indeed the right path forward for replacing stalled_task_timeout, I suspect it makes sense to also remove task_adoption_timeout. |
related: #ISSUE
We have been using Airflow with Celery executor on EKS to schedule up to 8,000 task instances per day on production data pipeline, and suffered from task instances stuck in
queued
status.On average, we found 1~3 task instances stuck in
queued
states per day, and those tasks did not conform execution_timeout or write any task logs, because they had not been launched by any celery worker!I found logic for handling similar issues on
KubernetesExecutor
's method:clear_not_launched_queued_tasks()
So, I had made a custom DAG and task to perform similar operation(find tasks stuck in queued status, and re-schedule those by updating state field to
scheduled
with sqlalchemy query (like{TaskInstance.state: State.SCHEDULED}
)this DAG works very well for the last few months, but on last Friday, we got very funny situation that task instance of this custom DAG was stuck in queued state. so, I guess this checking logic will be better if it is located on the executor code side with airflow built-in
sched
based scheduler(EventScheduler
), like KubernetesExecutor.If we know the root cause of this problem, there will be more elegant solution. but, I read some related issues linked above, and guess there is not just one reason to make stuck tasks, case by case. (Some parts of this issue is marked as fixed, but the issues remains consistently to latest release (
v2.5
)