Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Kubernetes library version and backport pickle-fix for Loggers #18797

Merged
merged 1 commit into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airflow/providers/cncf/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,30 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys

if sys.version_info < (3, 7):
# This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that
# v1.Pod et al. are not pickleable on Python 3.6.

# Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this
# method.

# This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means
# that we can update the version used in this provider and have it work for older versions
import copyreg
import logging

def _reduce_Logger(logger):
if logging.getLogger(logger.name) is not logger:
import pickle

raise pickle.PicklingError('logger cannot be pickled')
return logging.getLogger, (logger.name,)

def _reduce_RootLogger(logger):
return logging.getLogger, ()

if logging.Logger not in copyreg.dispatch_table:
copyreg.pickle(logging.Logger, _reduce_Logger)
copyreg.pickle(logging.RootLogger, _reduce_RootLogger)
12 changes: 9 additions & 3 deletions airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import time
from contextlib import closing
from datetime import datetime
from typing import Iterable, Optional, Tuple, Union
from typing import TYPE_CHECKING, Iterable, Optional, Tuple, Union

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_event_list import V1EventList
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
Expand All @@ -38,6 +37,13 @@
from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
try:
# Kube >= 19
from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
except ImportError:
from kubernetes.client.models.v1_event_list import V1EventList


class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""
Expand Down Expand Up @@ -293,7 +299,7 @@ def read_pod_logs(
raise

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def read_pod_events(self, pod: V1Pod) -> V1EventList:
def read_pod_events(self, pod: V1Pod) -> "V1EventList":
"""Reads events from the POD"""
try:
return self._client.list_namespaced_event(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
]
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0, <12.0.0',
'kubernetes>=3.0.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
Expand Down
6 changes: 5 additions & 1 deletion tests/kubernetes/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,9 @@ def test_disable_verify_ssl(self):

_disable_verify_ssl()

configuration = Configuration()
# Support wide range of kube client libraries
if hasattr(Configuration, 'get_default_copy'):
configuration = Configuration.get_default_copy()
else:
configuration = Configuration()
ashb marked this conversation as resolved.
Show resolved Hide resolved
self.assertFalse(configuration.verify_ssl)