Skip to content

Commit

Permalink
Fix stuck "queued" tasks in KubernetesExecutor (#18152)
Browse files Browse the repository at this point in the history
There are a set of circumstances where TaskInstances can get "stuck" in the QUEUED state when they are running under KubernetesExecutor, where they claim to have a pod scheduled (and so are queued) but do not actually have one, and so sit there forever.

It appears this happens occasionally with reschedule sensors and now more often with deferrable tasks, when the task instance defers/reschedules and then resumes before the old pod has vanished. It would also, I believe, happen when the Executor hard-exits with items still in its internal queues.

There was a pre-existing method in there to clean up stuck queued tasks, but it only ran once, on executor start. I have modified it to be safe to run periodically (by teaching it not to touch things that the executor looked at recently), and then made it run every so often (60 seconds by default).

This is not a perfect fix - the only real fix would be to have far more detailed state tracking as part of TaskInstance or another table, and re-architect the KubernetesExecutor. However, this should reduce the number of times this happens very signficantly, so it should do for now.
  • Loading branch information
Andrew Godwin authored Sep 20, 2021
1 parent a01c08b commit bada372
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 17 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2224,6 +2224,13 @@
type: integer
example: ~
default: "120"
- name: worker_pods_queued_check_interval
description: |
How often in seconds to check for task instances stuck in "queued" status without a pod
version_added: 2.2.0
type: integer
example: ~
default: "60"
- name: worker_pods_pending_timeout_batch_size
description: |
How many pending pods to check for timeout violations in each check interval.
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,9 @@ worker_pods_pending_timeout = 300
# How often in seconds to check if Pending workers have exceeded their timeouts
worker_pods_pending_timeout_check_interval = 120

# How often in seconds to check for task instances stuck in "queued" status without a pod
worker_pods_queued_check_interval = 60

# How many pending pods to check for timeout violations in each check interval.
# You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``.
worker_pods_pending_timeout_batch_size = 100
Expand Down
49 changes: 33 additions & 16 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,40 +431,48 @@ def __init__(self):
self.kube_client: Optional[client.CoreV1Api] = None
self.scheduler_job_id: Optional[str] = None
self.event_scheduler: Optional[EventScheduler] = None
self.last_handled: Dict[TaskInstanceKey, int] = {}
super().__init__(parallelism=self.kube_config.parallelism)

@provide_session
def clear_not_launched_queued_tasks(self, session=None) -> None:
"""
If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or
may not
have been launched. Thus on starting up the scheduler let's check every
"Queued" task to
see if it has been launched (ie: if there is a corresponding pod on kubernetes)
If it has been launched then do nothing, otherwise reset the state to "None" so
the task
will be rescheduled
This will not be necessary in a future version of airflow in which there is
proper support
for State.LAUNCHED
Tasks can end up in a "Queued" state through either the executor being
abruptly shut down (leaving a non-empty task_queue on this executor)
or when a rescheduled/deferred operator comes back up for execution
(with the same try_number) before the pod of its previous incarnation
has been fully removed (we think).
This method checks each of those tasks to see if the corresponding pod
is around, and if not, and there's no matching entry in our own
task_queue, marks it for re-execution.
"""
self.log.debug("Clearing tasks that have not been launched")
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
queued_tasks = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
self.log.info('When executor started up, found %s queued task instances', len(queued_tasks))
self.log.info('Found %s queued task instances', len(queued_tasks))

# Go through the "last seen" dictionary and clean out old entries
allowed_age = self.kube_config.worker_pods_queued_check_interval * 3
for key, timestamp in list(self.last_handled.items()):
if time.time() - timestamp > allowed_age:
del self.last_handled[key]

for task in queued_tasks:

self.log.debug("Checking task %s", task)

# Check to see if we've handled it ourselves recently
if task.key in self.last_handled:
continue

# Build the pod selector
dict_string = "dag_id={},task_id={},airflow-worker={}".format(
pod_generator.make_safe_label_value(task.dag_id),
pod_generator.make_safe_label_value(task.task_id),
pod_generator.make_safe_label_value(str(self.scheduler_job_id)),
)

kwargs = dict(label_selector=dict_string)
if self.kube_config.kube_client_request_args:
kwargs.update(**self.kube_config.kube_client_request_args)
Expand All @@ -486,7 +494,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
TaskInstance.dag_id == task.dag_id,
TaskInstance.task_id == task.task_id,
TaskInstance.run_id == task.run_id,
).update({TaskInstance.state: State.NONE})
).update({TaskInstance.state: State.SCHEDULED})

def start(self) -> None:
"""Starts the executor"""
Expand All @@ -504,6 +512,12 @@ def start(self) -> None:
self.kube_config.worker_pods_pending_timeout_check_interval,
self._check_worker_pods_pending_timeout,
)
self.event_scheduler.call_regular_interval(
self.kube_config.worker_pods_queued_check_interval,
self.clear_not_launched_queued_tasks,
)
# We also call this at startup as that's the most likely time to see
# stuck queued tasks
self.clear_not_launched_queued_tasks()

def execute_async(
Expand All @@ -530,6 +544,9 @@ def execute_async(
raise AirflowException(NOT_STARTED_MESSAGE)
self.event_buffer[key] = (State.QUEUED, self.scheduler_job_id)
self.task_queue.put((key, command, kube_executor_config, pod_template_file))
# We keep a temporary local record that we've handled this so we don't
# try and remove it from the QUEUED state while we process it
self.last_handled[key] = time.time()

def sync(self) -> None:
"""Synchronize task state."""
Expand Down
3 changes: 3 additions & 0 deletions airflow/kubernetes/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def __init__(self):
self.worker_pods_pending_timeout_batch_size = conf.getint(
self.kubernetes_section, 'worker_pods_pending_timeout_batch_size'
)
self.worker_pods_queued_check_interval = conf.getint(
self.kubernetes_section, 'worker_pods_queued_check_interval'
)

kube_client_request_args = conf.get(self.kubernetes_section, 'kube_client_request_args')
if kube_client_request_args:
Expand Down
2 changes: 1 addition & 1 deletion tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def test_pending_pod_timeout(self, mock_kubescheduler, mock_get_kube_client, moc
executor = KubernetesExecutor()
executor.job_id = "123"
executor.start()
assert 1 == len(executor.event_scheduler.queue)
assert 2 == len(executor.event_scheduler.queue)
executor._check_worker_pods_pending_timeout()

mock_kube_client.list_namespaced_pod.assert_called_once_with(
Expand Down

0 comments on commit bada372

Please sign in to comment.