From 0d1dc457ab3eb553fc73a993033b2d8a40c3ec0c Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 7 Oct 2021 10:19:27 +0100 Subject: [PATCH] Update Kubernetes library version Previously we pinned this version as v12 as a change to Kube library internals meant v1.Pod objects now have a logger object inside them, and couldn't be pickled on Python 3.6. To fix that we have "backported" the change in Python 3.7 to make Logger objects be pickled "by name". (In Python 3.7 the change adds `__reduce__` methods on to the Logger and RootLogger objects, but here we achieve it `copyreg` stdlib module so we don't monkeypatch anything.) This fix is also applied in to airflow core in a separate commit, but we also apply it here in the provider so that cncf.kubernetes client library can be updated but still used with older versions of Airflow that don't have this fix in. --- airflow/providers/cncf/kubernetes/__init__.py | 27 +++++++++++++++++++ .../cncf/kubernetes/utils/pod_manager.py | 12 ++++++--- setup.py | 2 +- tests/kubernetes/test_client.py | 6 ++++- 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py index 217e5db960782..0998e31143fc8 100644 --- a/airflow/providers/cncf/kubernetes/__init__.py +++ b/airflow/providers/cncf/kubernetes/__init__.py @@ -15,3 +15,30 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys + +if sys.version_info < (3, 7): + # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that + # v1.Pod et al. are not pickleable on Python 3.6. + + # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this + # method. + + # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means + # that we can update the version used in this provider and have it work for older versions + import copyreg + import logging + + def _reduce_Logger(logger): + if logging.getLogger(logger.name) is not logger: + import pickle + + raise pickle.PicklingError('logger cannot be pickled') + return logging.getLogger, (logger.name,) + + def _reduce_RootLogger(logger): + return logging.getLogger, () + + if logging.Logger not in copyreg.dispatch_table: + copyreg.pickle(logging.Logger, _reduce_Logger) + copyreg.pickle(logging.RootLogger, _reduce_RootLogger) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 70b5b4c2ebaed..3640eb2e5da9d 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -20,12 +20,11 @@ import time from contextlib import closing from datetime import datetime -from typing import Iterable, Optional, Tuple, Union +from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union import pendulum import tenacity from kubernetes import client, watch -from kubernetes.client.models.v1_event_list import V1EventList from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream @@ -38,6 +37,13 @@ from airflow.kubernetes.pod_generator import PodDefaults from airflow.utils.log.logging_mixin import LoggingMixin +if TYPE_CHECKING: + try: + # Kube >= 19 + from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList + except ImportError: + from kubernetes.client.models.v1_event_list import V1EventList + class PodLaunchFailedException(AirflowException): """When pod launching fails in KubernetesPodOperator.""" @@ -293,7 +299,7 @@ def read_pod_logs( raise @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod_events(self, pod: V1Pod) -> V1EventList: + def read_pod_events(self, pod: V1Pod) -> "V1EventList": """Reads events from the POD""" try: return self._client.list_namespaced_event( diff --git a/setup.py b/setup.py index 2b4771ee007d4..c9d4d0ba87d8d 100644 --- a/setup.py +++ b/setup.py @@ -381,7 +381,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version ] kubernetes = [ 'cryptography>=2.0.0', - 'kubernetes>=3.0.0, <12.0.0', + 'kubernetes>=3.0.0', ] kylin = ['kylinpy>=2.6'] ldap = [ diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index bf5dcfc3d0a2d..9228e9b704a54 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -63,5 +63,9 @@ def test_disable_verify_ssl(self): _disable_verify_ssl() - configuration = Configuration() + # Support wide range of kube client libraries + if hasattr(Configuration, 'get_default_copy'): + configuration = Configuration.get_default_copy() + else: + configuration = Configuration() self.assertFalse(configuration.verify_ssl)