diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 4b9e3425739dd..9612727a0b105 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2224,6 +2224,13 @@ type: integer example: ~ default: "120" + - name: worker_pods_queued_check_interval + description: | + How often in seconds to check for task instances stuck in "queued" status without a pod + version_added: 2.2.0 + type: integer + example: ~ + default: "60" - name: worker_pods_pending_timeout_batch_size description: | How many pending pods to check for timeout violations in each check interval. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 558e3551520ed..84af35ca288e6 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1097,6 +1097,9 @@ worker_pods_pending_timeout = 300 # How often in seconds to check if Pending workers have exceeded their timeouts worker_pods_pending_timeout_check_interval = 120 +# How often in seconds to check for task instances stuck in "queued" status without a pod +worker_pods_queued_check_interval = 60 + # How many pending pods to check for timeout violations in each check interval. # You may want this higher if you have a very large cluster and/or use ``multi_namespace_mode``. worker_pods_pending_timeout_batch_size = 100 diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 5e748da1e827c..8caf3baf5feaf 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -431,40 +431,48 @@ def __init__(self): self.kube_client: Optional[client.CoreV1Api] = None self.scheduler_job_id: Optional[str] = None self.event_scheduler: Optional[EventScheduler] = None + self.last_handled: Dict[TaskInstanceKey, int] = {} super().__init__(parallelism=self.kube_config.parallelism) @provide_session def clear_not_launched_queued_tasks(self, session=None) -> None: """ - If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or - may not - have been launched. Thus on starting up the scheduler let's check every - "Queued" task to - see if it has been launched (ie: if there is a corresponding pod on kubernetes) - - If it has been launched then do nothing, otherwise reset the state to "None" so - the task - will be rescheduled - - This will not be necessary in a future version of airflow in which there is - proper support - for State.LAUNCHED + Tasks can end up in a "Queued" state through either the executor being + abruptly shut down (leaving a non-empty task_queue on this executor) + or when a rescheduled/deferred operator comes back up for execution + (with the same try_number) before the pod of its previous incarnation + has been fully removed (we think). + + This method checks each of those tasks to see if the corresponding pod + is around, and if not, and there's no matching entry in our own + task_queue, marks it for re-execution. """ self.log.debug("Clearing tasks that have not been launched") if not self.kube_client: raise AirflowException(NOT_STARTED_MESSAGE) queued_tasks = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all() - self.log.info('When executor started up, found %s queued task instances', len(queued_tasks)) + self.log.info('Found %s queued task instances', len(queued_tasks)) + + # Go through the "last seen" dictionary and clean out old entries + allowed_age = self.kube_config.worker_pods_queued_check_interval * 3 + for key, timestamp in list(self.last_handled.items()): + if time.time() - timestamp > allowed_age: + del self.last_handled[key] for task in queued_tasks: self.log.debug("Checking task %s", task) + + # Check to see if we've handled it ourselves recently + if task.key in self.last_handled: + continue + + # Build the pod selector dict_string = "dag_id={},task_id={},airflow-worker={}".format( pod_generator.make_safe_label_value(task.dag_id), pod_generator.make_safe_label_value(task.task_id), pod_generator.make_safe_label_value(str(self.scheduler_job_id)), ) - kwargs = dict(label_selector=dict_string) if self.kube_config.kube_client_request_args: kwargs.update(**self.kube_config.kube_client_request_args) @@ -486,7 +494,7 @@ def clear_not_launched_queued_tasks(self, session=None) -> None: TaskInstance.dag_id == task.dag_id, TaskInstance.task_id == task.task_id, TaskInstance.run_id == task.run_id, - ).update({TaskInstance.state: State.NONE}) + ).update({TaskInstance.state: State.SCHEDULED}) def start(self) -> None: """Starts the executor""" @@ -504,6 +512,12 @@ def start(self) -> None: self.kube_config.worker_pods_pending_timeout_check_interval, self._check_worker_pods_pending_timeout, ) + self.event_scheduler.call_regular_interval( + self.kube_config.worker_pods_queued_check_interval, + self.clear_not_launched_queued_tasks, + ) + # We also call this at startup as that's the most likely time to see + # stuck queued tasks self.clear_not_launched_queued_tasks() def execute_async( @@ -530,6 +544,9 @@ def execute_async( raise AirflowException(NOT_STARTED_MESSAGE) self.event_buffer[key] = (State.QUEUED, self.scheduler_job_id) self.task_queue.put((key, command, kube_executor_config, pod_template_file)) + # We keep a temporary local record that we've handled this so we don't + # try and remove it from the QUEUED state while we process it + self.last_handled[key] = time.time() def sync(self) -> None: """Synchronize task state.""" diff --git a/airflow/kubernetes/kube_config.py b/airflow/kubernetes/kube_config.py index ef3283155ff73..c85d7df6dcc4c 100644 --- a/airflow/kubernetes/kube_config.py +++ b/airflow/kubernetes/kube_config.py @@ -66,6 +66,9 @@ def __init__(self): self.worker_pods_pending_timeout_batch_size = conf.getint( self.kubernetes_section, 'worker_pods_pending_timeout_batch_size' ) + self.worker_pods_queued_check_interval = conf.getint( + self.kubernetes_section, 'worker_pods_queued_check_interval' + ) kube_client_request_args = conf.get(self.kubernetes_section, 'kube_client_request_args') if kube_client_request_args: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 2fa0b8af1451d..e7911ce3984de 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -608,7 +608,7 @@ def test_pending_pod_timeout(self, mock_kubescheduler, mock_get_kube_client, moc executor = KubernetesExecutor() executor.job_id = "123" executor.start() - assert 1 == len(executor.event_scheduler.queue) + assert 2 == len(executor.event_scheduler.queue) executor._check_worker_pods_pending_timeout() mock_kube_client.list_namespaced_pod.assert_called_once_with(