Skip to content

Commit

Permalink
[v2-10-test] Re-queue tassk when they are stuck in queued (#43520) (#…
Browse files Browse the repository at this point in the history
…44158)

* [v2-10-test] Re-queue tassk when they are stuck in queued (#43520)

The old "stuck in queued" logic just failed the tasks.  Now we requeue them.  We accomplish this by revoking the task from executor and setting state to scheduled.  We'll re-queue it up to 2 times.  Number of times is configurable by hidden config.

We added a method to base executor revoke_task because, it's a discrete operation that is required for this feature, and it might be useful in other cases e.g. when detecting as zombies etc.  We set state to failed or scheduled directly from scheduler (rather than sending through the event buffer) because event buffer makes more sense for handling external events -- why round trip through the executor and back to scheduler when scheduler is initiating the action?  Anyway this avoids having to deal with "state mismatch" issues when processing events.

---------

(cherry picked from commit a41feeb)

Co-authored-by: Daniel Imberman <[email protected]>
Co-authored-by: Daniel Standish <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>

* fix test_handle_stuck_queued_tasks_multiple_attempts (#44093)

---------

Co-authored-by: Daniel Imberman <[email protected]>
Co-authored-by: Daniel Standish <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: GPK <[email protected]>
  • Loading branch information
5 people authored and utkarsharma2 committed Dec 9, 2024
1 parent 1738256 commit a3e5e34
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 46 deletions.
26 changes: 24 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum
from deprecated import deprecated

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
Expand Down Expand Up @@ -545,7 +546,12 @@ def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError()

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
@deprecated(
reason="Replaced by function `revoke_task`.",
category=RemovedInAirflow3Warning,
action="ignore",
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.
Expand All @@ -556,7 +562,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError()
raise NotImplementedError

def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.
It should attempt to ensure that the task is no longer running on the worker,
and ensure that it is cleared out from internal data structures.
It should *not* change the state of the task in airflow, or add any events
to the event buffer.
It should not raise any error.
:param ti: Task instance to remove
"""
raise NotImplementedError

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Expand Down
161 changes: 129 additions & 32 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import time
import warnings
from collections import Counter, defaultdict, deque
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from deprecated import deprecated
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
Expand Down Expand Up @@ -97,6 +99,9 @@
DR = DagRun
DM = DagModel

TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
""":meta private:"""


@dataclass
class ConcurrencyMap:
Expand Down Expand Up @@ -228,6 +233,13 @@ def __init__(
stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout
)

# this param is intentionally undocumented
self._num_stuck_queued_retries = conf.getint(
section="scheduler",
key="num_stuck_in_queued_retries",
fallback=2,
)

self.do_pickle = do_pickle

if log:
Expand Down Expand Up @@ -1093,7 +1105,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._fail_tasks_stuck_in_queued,
self._handle_tasks_stuck_in_queued,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -1141,6 +1153,7 @@ def _run_scheduler_loop(self) -> None:
for executor in self.job.executors:
try:
# this is backcompat check if executor does not inherit from BaseExecutor
# todo: remove in airflow 3.0
if not hasattr(executor, "_task_event_logs"):
continue
with create_session() as session:
Expand Down Expand Up @@ -1772,48 +1785,132 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
self.job.executor.send_callback(request)

@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Handle the scenario where a task is queued for longer than `task_queued_timeout`.
Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
should not be stuck in queued for a long time.
We will attempt to requeue the task (by revoking it from executor and setting to
scheduled) up to 2 times before failing the task.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
)
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
# just continue immediately.
self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue

tasks_stuck_in_queued = session.scalars(
def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
"""Query db for TIs that are stuck in queued."""
return session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
)

for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
def _maybe_requeue_stuck_ti(self, *, ti, session):
"""
Requeue task if it has not been attempted too many times.
Otherwise, fail it.
"""
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
if num_times_stuck < self._num_stuck_queued_retries:
self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
task_instance=ti.key,
extra=(
f"Task was in queued state for longer than {self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
self._reschedule_stuck_task(ti)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
category=RemovedInAirflow3Warning,
action="ignore",
)
def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
"""
Try to invoke stuck in queued cleanup for older executor interface.
TODO: remove in airflow 3.0
Here we handle case where the executor pre-dates the interface change that
introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.
"""
with suppress(NotImplementedError):
for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self.log.warning(
"Task instance %s stuck in queued. Will be set to failed.",
ti_repr,
)

@provide_session
def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
.values(
state=TaskInstanceState.SCHEDULED,
queued_dttm=None,
)
.execution_options(synchronize_session=False)
)

@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.
We can then use this information to determine whether to reschedule a task or fail it.
"""
return (
session.query(Log)
.where(
Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
)
.count()
)

@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -2102,7 +2199,7 @@ def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
updated_count = sum(self._set_orphaned(dataset) for dataset in orphaned_dataset_query)
Stats.gauge("dataset.orphaned", updated_count)

def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
for ti in tis:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,7 @@ repos
repr
req
reqs
requeued
Reserialize
reserialize
reserialized
Expand Down
Loading

0 comments on commit a3e5e34

Please sign in to comment.