Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesJobWatcher does not delete worker pods #14974

Closed
mrpowerus opened this issue Mar 24, 2021 · 10 comments
Closed

KubernetesJobWatcher does not delete worker pods #14974

mrpowerus opened this issue Mar 24, 2021 · 10 comments
Assignees
Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues

Comments

@mrpowerus
Copy link

mrpowerus commented Mar 24, 2021

Apache Airflow version: 2.0.0 and 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.18.4 (AKS)

Environment:

  • Cloud provider or hardware configuration: Azure Cloud
  • OS (e.g. from /etc/os-release): Debian GNU/Linux 10 (Buster)
  • Kernel (e.g. uname -a): Linux airflow-scheduler-5cf464667c-7zd6j 5.4.0-1040-azure Issue 23 Explanation #42~18.04.1-Ubuntu SMP Mon Feb 8 19:05:32 UTC 2021 x86_64 GNU/Linux
  • Others: Image apache/airflow:2.0.1-python3.8

What happened:

KubernetesJobWatcher does not delete Worker Pods after they are assigned the 'status.phase=Succeeded'. But this only happens after 30-ish minutes of complete inactivity of the Kubernetes Cluster.

What you expected to happen:

The KubernetesJobWatcher should delete Worker Pods after they have been successful at any time. As my config states (I verfied this with airflow config:

    [kubernetes]
    pod_template_file = /opt/airflow/pod_template_file.yaml
    worker_container_repository = apache/airflow
    worker_container_tag = 2.0.1-python3.8
    delete_worker_pods = True
    delete_worker_pods_on_failure = False

The Executor tries over-and-over again to adopt completed pods.

This is successful. However, the Pods are not cleaned by the KubernetesJobWatcher as no logging of the watcher appears. (I would expect logging from this line)

After some digging, I think the watch.stream() from from kubernetes import client, watch which is called in https://github.com/apache/airflow/blob/v2-0-stable/airflow/executors/kubernetes_executor.py#L143 expires after a long time of complete inactivity. This is also explicitly mentioned in the docstring of the kubernetes.watch.Stream, which was added in this commit after version 11.0.0.

However, my Airflow is using the constraints file which uses the previous version of the Kubernetes client (version 11.0.0) which contains the following watcher.stream.

It seems that Airflow can recover itself by resetting the resource-version. But this does not seem to work for some reason. (I'm currently investigating why)

I think Airflow should be able to recover from this issue automatically. Otherwise I should run a dummy task each 30-ish minutes or so, just to keep the kubernetes.watch.stream() alive.

How to reproduce it:
Run Airflow 2+ in a Kubernetes cluster which has no activity at all for 30-ish minutes. Then start an operator. The Kubernetes Worker will not be deleted.

@mrpowerus mrpowerus added the kind:bug This is a clearly a bug label Mar 24, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 24, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@vikramkoka vikramkoka added provider:cncf-kubernetes Kubernetes provider related issues affected_version:2.0 Issues Reported for 2.0 labels Mar 25, 2021
@mrpowerus
Copy link
Author

mrpowerus commented Mar 29, 2021

After closer inspection and debugging, it seems that a urllib3.exceptions.ProtocolError is raised by the Kubernetes client. This error is not accounted for in Airflow. Airflow just goes to Exception and raises. This exits the while loop and that's it. No mercy.

@uranusjr
Copy link
Member

I wonder why ProtocolError is raised when and only when there’s a long inactivity period. It’s of course easy to catch the exception without aborting, but that’s only solving the sympton. There should be a deeper cause to this.

@mrpowerus
Copy link
Author

mrpowerus commented Mar 30, 2021

After debugging the TCP/IP connections, I found that the connection to the KubeAPI was reset after some minutes of complete inactivity for the kubernetes.Watcher.stream(). However, the watcher seems to think the connection is still fine and continues listening for some (unknown) reason and no error appears.

This would also explain the fact why no logging of the type of Event: ...... was showing up at some point.

The fix seems to be to reset the watcher.stream, by adding the timeout_seconds argument. This ensures that the connection is restarted after some time, which keeps the connection alive.

My previous comment about the ProtocolError is not correct, as the KubernetesWatcher Procees did not raise an Exception. (I only assumed so as it appeared when I was testing my code locally).

This patch seems to solve the problem:

--- kubernetes_executor.py	2021-03-30 13:40:10.957157100 +0200
+++ kubernetes_executor.py	2021-03-30 13:45:13.836000000 +0200
@@ -142,7 +142,7 @@
             list_worker_pods = functools.partial(
                 watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
             )
-        for event in list_worker_pods():
+        for event in list_worker_pods(timeout_seconds=60):
             task = event['object']
             self.log.info('Event: %s had an event of type %s', task.metadata.name, event['type'])
             if event['type'] == 'ERROR':

@ephraimbuddy
Copy link
Contributor

@mrpowerus please can you give more detailed steps to reproduce this?

That said, I'm not Ok with your configuration.
These lines in your config

worker_container_repository = apache/airflow
worker_container_tag = 2.0.1-python3.8

makes every task start pulling 2.0.1-python3.8 image afresh before they can create a container if images.airflow.repository and images.airflow.tag are configured differently. 2.0.1-python3.8 image takes a long time to be pulled and you can have network error in the process.

Interestingly, When I configured worker_container_repository & worker_container_tag as you did and ran with airflow master(using breeze) repository. The images were pulled correctly but I got errors that the dag I ran could not be found.

When you use breeze to start a kubernetes cluster, It loads the example dags. Now using your configuration and also making sure that what's in images.airflow.repository & images.airflow.tag corresponds to what workers are using, I got the same error:

BACKEND=postgresql                                                                                                                                                                                          │
│ DB_HOST=airflow-postgresql.airflow.svc.cluster.local                                                                                                                                                        │
│ DB_PORT=5432                                                                                                                                                                                                │
│ [2021-04-12 06:36:06,121] {settings.py:210} DEBUG - Setting up DB connection pool (PID 7)                                                                                                                   │
│ [2021-04-12 06:36:06,121] {settings.py:275} DEBUG - settings.prepare_engine_args(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=7                                             │
│ [2021-04-12 06:36:06,192] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f50eaa82670> to pre execution callback                                                               │
│ [2021-04-12 06:36:08,064] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7f50eaa82670>]                                                                           │
│ [2021-04-12 06:36:08,075] {settings.py:210} DEBUG - Setting up DB connection pool (PID 7)                                                                                                                   │
│ [2021-04-12 06:36:08,075] {settings.py:243} DEBUG - settings.prepare_engine_args(): Using NullPool                                                                                                          │
│ [2021-04-12 06:36:08,075] {dagbag.py:448} INFO - Filling up the DagBag from /opt/airflow/dags/example_bash_operator.py                                                                                      │
│ [2021-04-12 06:36:08,076] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []                                                                                                                          │
│ Traceback (most recent call last):        
File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                                                                                              │
│     sys.exit(main())                                                                                                                                                                                        │
│   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main                                                                                                             │
│     args.func(args)                                                                                                                                                                                         │
│   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command                                                                                                    │
│     return func(*args, **kwargs)                                                                                                                                                                            │
│   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper                                                                                                         │
│     return f(*args, **kwargs)                                                                                                                                                                               │
│   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 217, in task_run                                                                                       │
│     dag = get_dag(args.subdir, args.dag_id)                                                                                                                                                                 │
│   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 187, in get_dag                                                                                                        │
│     raise AirflowException(                                                                                                                                                                                 │
│ airflow.exceptions.AirflowException: dag_id could not be found: example_bash_operator. Either the dag did not exist or it failed to parse.     
 [2021-04-12 06:36:08,076] {settings.py:292} DEBUG - Disposing DB connection pool (PID 7)                                                   

@mrpowerus
Copy link
Author

Thanks @ephraimbuddy. I am using my own helm chart. However, the error you're showing is not the same as the one I received. I'm confused about your statement. I thought that these config lines just indicated which container/tag Airflow uses to start for his worker?

@ephraimbuddy
Copy link
Contributor

Thanks @ephraimbuddy. I am using my own helm chart. However, the error you're showing is not the same as the one I received. I'm confused about your statement. I thought that these config lines just indicated which container/tag Airflow uses to start for his worker?

Yes. My error is specific to my setup which uses breeze. Can you maybe create a repo that can reproduce this using your own helm chart or just share how I can reproduce?

@mrpowerus
Copy link
Author

This issue on AKS seems to be related:
Azure/AKS#1052

@mrpowerus
Copy link
Author

mrpowerus commented Apr 21, 2021

It seems that adding the TCP keepalive in the config fixed the problem after all, which was obvious in hindsight. However, this was hard to find, due to no logging output.

@fashtop3
Copy link

I hade similar problem when we upgraded to version 2.x Pods get restarted even after the Dags ran successfully.

I later resolved it after a long time of debugging by overriding the pod template and specifying it in the airflow.cfg file.

```

[kubernetes]
....
pod_template_file = {{ .Values.airflow.home }}/pod_template.yaml
.....


# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  serviceAccountName: default
  restartPolicy: Never
  containers:
    - name: base
      image: dummy_image
      imagePullPolicy: IfNotPresent
      ports: []
      command: []

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

No branches or pull requests

5 participants