From 6d85a0466d91d501af87c8904b902ea92cee466d Mon Sep 17 00:00:00 2001 From: rom sharon <33751805+romsharon98@users.noreply.github.com> Date: Sat, 9 Nov 2024 22:43:40 +0200 Subject: [PATCH] Remove deprecations cncf.kubernetes (#43689) * change default namespace value to be None * revert ui change * remove deprecations from cncf.kubernetes * add changelog * add list of changes in changelog * Update providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst * fix tests * fix tests * update changelog, remove deprecation warnings from tests * add todo comment for removing import * remove kubernetes_pod as deprecated module * remove unecessary test, add import for deprecation * fix tests * fix tests and imports * fix tests * add ignore re-def in correct place --------- Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- airflow/cli/commands/kubernetes_command.py | 4 +- .../operators.rst | 2 +- .../test_kubernetes_pod_operator.py | 18 - .../providers/amazon/aws/operators/eks.py | 6 +- .../providers/cncf/kubernetes/CHANGELOG.rst | 28 ++ .../kubernetes/kubernetes_helper_functions.py | 41 --- .../kubernetes/operators/kubernetes_pod.py | 31 -- .../cncf/kubernetes/operators/pod.py | 32 +- .../kubernetes/operators/spark_kubernetes.py | 2 - .../cncf/kubernetes/pod_generator.py | 118 ------- .../kubernetes/pod_launcher_deprecated.py | 320 ------------------ .../kubernetes/triggers/kubernetes_pod.py | 31 -- .../providers/cncf/kubernetes/triggers/pod.py | 23 +- .../cncf/kubernetes/utils/pod_manager.py | 23 +- .../executors/test_kubernetes_executor.py | 7 +- .../cncf/kubernetes/models/test_secret.py | 5 +- .../cncf/kubernetes/operators/test_pod.py | 35 -- .../test_kubernetes_helper_functions.py | 21 +- .../cncf/kubernetes/test_pod_generator.py | 105 +----- .../cncf/kubernetes/triggers/test_pod.py | 1 - .../cncf/kubernetes/utils/test_pod_manager.py | 16 - .../cloud/triggers/test_kubernetes_engine.py | 8 +- .../run_provider_yaml_files_check.py | 2 - tests/always/test_project_structure.py | 4 - 24 files changed, 61 insertions(+), 822 deletions(-) delete mode 100644 providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py delete mode 100644 providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py delete mode 100644 providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py index 2a6fccf14d18..eab11133c9dd 100644 --- a/airflow/cli/commands/kubernetes_command.py +++ b/airflow/cli/commands/kubernetes_command.py @@ -31,7 +31,7 @@ from airflow.providers.cncf.kubernetes import pod_generator from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig from airflow.providers.cncf.kubernetes.kube_client import get_kube_client -from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils import cli as cli_utils, yaml from airflow.utils.cli import get_dag @@ -59,7 +59,7 @@ def generate_pod_yaml(args): pod = PodGenerator.construct_pod( dag_id=args.dag_id, task_id=ti.task_id, - pod_id=create_pod_id(args.dag_id, ti.task_id), + pod_id=create_unique_id(args.dag_id, ti.task_id), try_number=ti.try_number, kube_image=kube_config.kube_image, date=ti.execution_date, diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index 2268a8655c98..dfc54e092fe0 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -206,7 +206,7 @@ Read more on termination-log `here str: - """ - Add random string to pod name while staying under max length. - - :param pod_name: name of the pod - :param rand_len: length of the random string to append - :param max_len: maximum length of the pod name - :meta private: - """ - return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len) - - def create_unique_id( dag_id: str | None = None, task_id: str | None = None, @@ -110,29 +92,6 @@ def create_unique_id( return base_name -@deprecated( - reason="This function is deprecated. Please use `create_unique_id`.", - category=AirflowProviderDeprecationWarning, -) -def create_pod_id( - dag_id: str | None = None, - task_id: str | None = None, - *, - max_length: int = POD_NAME_MAX_LENGTH, - unique: bool = True, -) -> str: - """ - Generate unique pod ID given a dag_id and / or task_id. - - :param dag_id: DAG ID - :param task_id: Task ID - :param max_length: max number of characters - :param unique: whether a random string suffix should be added - :return: A valid identifier for a kubernetes pod name - """ - return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique) - - def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey: """Build a TaskInstanceKey based on pod annotations.""" log.debug("Creating task key for annotations %s", annotations) diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py deleted file mode 100644 index 3b3e6f7d952d..000000000000 --- a/providers/src/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module is deprecated. Please use :mod:`airflow.providers.cncf.kubernetes.operators.pod` instead.""" - -from __future__ import annotations - -import warnings - -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.cncf.kubernetes.operators.pod import * # noqa: F403 - -warnings.warn( - "This module is deprecated. Please use `airflow.providers.cncf.kubernetes.operators.pod` instead.", - AirflowProviderDeprecationWarning, - stacklevel=2, -) diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py index 62f08439d416..2c99126230c3 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -26,7 +26,6 @@ import re import shlex import string -import warnings from collections.abc import Container, Mapping from contextlib import AbstractContextManager from enum import Enum @@ -35,7 +34,6 @@ import kubernetes import tenacity -from deprecated import deprecated from kubernetes.client import CoreV1Api, V1Pod, models as k8s from kubernetes.client.exceptions import ApiException from kubernetes.stream import stream @@ -44,7 +42,6 @@ from airflow.configuration import conf from airflow.exceptions import ( AirflowException, - AirflowProviderDeprecationWarning, AirflowSkipException, TaskDeferred, ) @@ -215,18 +212,12 @@ class KubernetesPodOperator(BaseOperator): :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. - :param is_delete_operator_pod: What to do when the pod reaches its final - state, or the execution is interrupted. If True (default), delete the - pod; if False, leave the pod. - Deprecated - use `on_finish_action` instead. :param termination_message_policy: The termination message policy of the base container. Default value is "File" :param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds in V1PodSpec. :param callbacks: KubernetesPodOperatorCallback instance contains the callbacks methods on different step of KubernetesPodOperator. - :param progress_callback: Callback function for receiving k8s container logs. - `progress_callback` is deprecated, please use :param `callbacks` instead. :param logging_interval: max time in seconds that task should be in deferred state before resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod is done, and no logs will be visible until that time. @@ -404,19 +395,8 @@ def __init__( self.poll_interval = poll_interval self.remote_pod: k8s.V1Pod | None = None self.log_pod_spec_on_failure = log_pod_spec_on_failure - if is_delete_operator_pod is not None: - warnings.warn( - "`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - self.on_finish_action = ( - OnFinishAction.DELETE_POD if is_delete_operator_pod else OnFinishAction.KEEP_POD - ) - self.is_delete_operator_pod = is_delete_operator_pod - else: - self.on_finish_action = OnFinishAction(on_finish_action) - self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD + self.on_finish_action = OnFinishAction(on_finish_action) + self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD self.termination_message_policy = termination_message_policy self.active_deadline_seconds = active_deadline_seconds self.logging_interval = logging_interval @@ -512,9 +492,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool @cached_property def pod_manager(self) -> PodManager: - return PodManager( - kube_client=self.client, callbacks=self.callbacks, progress_callback=self._progress_callback - ) + return PodManager(kube_client=self.client, callbacks=self.callbacks) @cached_property def hook(self) -> PodOperatorHookProtocol: @@ -1161,10 +1139,6 @@ def dry_run(self) -> None: pod = self.build_pod_request_obj() print(yaml.dump(prune_dict(pod.to_dict(), mode="strict"))) - @deprecated(reason="use `trigger_reentry` instead.", category=AirflowProviderDeprecationWarning) - def execute_complete(self, context: Context, event: dict, **kwargs): - return self.trigger_reentry(context=context, event=event) - def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> k8s.V1Pod: """ Patch or delete the existing pod with duplicate labels. diff --git a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index acab68d85ff3..ad529290d8ed 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -98,8 +98,6 @@ def __init__( random_name_suffix: bool = True, **kwargs, ) -> None: - if kwargs.get("xcom_push") is not None: - raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") super().__init__(name=name, **kwargs) self.image = image self.code_path = code_path diff --git a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py index cf11db539e38..78838d9b4447 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py +++ b/providers/src/airflow/providers/cncf/kubernetes/pod_generator.py @@ -34,23 +34,16 @@ import re2 from dateutil import parser -from deprecated import deprecated from kubernetes.client import models as k8s from kubernetes.client.api_client import ApiClient from airflow.exceptions import ( AirflowConfigException, AirflowException, - AirflowProviderDeprecationWarning, ) from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( POD_NAME_MAX_LENGTH, add_unique_suffix, - rand_str, -) -from airflow.providers.cncf.kubernetes.pod_generator_deprecated import ( - PodDefaults as PodDefaultsDeprecated, - PodGenerator as PodGeneratorDeprecated, ) from airflow.utils import yaml from airflow.utils.hashlib_wrapper import md5 @@ -155,40 +148,6 @@ def __init__( # Attach sidecar self.extract_xcom = extract_xcom - @deprecated( - reason="This method is deprecated and will be removed in the future releases", - category=AirflowProviderDeprecationWarning, - ) - def gen_pod(self) -> k8s.V1Pod: - """Generate pod.""" - result = self.ud_pod - - result.metadata.name = add_unique_suffix(name=result.metadata.name) - - if self.extract_xcom: - result = self.add_xcom_sidecar(result) - - return result - - @staticmethod - @deprecated( - reason=( - "This function is deprecated. " - "Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead" - ), - category=AirflowProviderDeprecationWarning, - ) - def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: - """Add sidecar.""" - pod_cp = copy.deepcopy(pod) - pod_cp.spec.volumes = pod.spec.volumes or [] - pod_cp.spec.volumes.insert(0, PodDefaultsDeprecated.VOLUME) - pod_cp.spec.containers[0].volume_mounts = pod_cp.spec.containers[0].volume_mounts or [] - pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaultsDeprecated.VOLUME_MOUNT) - pod_cp.spec.containers.append(PodDefaultsDeprecated.SIDECAR_CONTAINER) - - return pod_cp - @staticmethod def from_obj(obj) -> dict | k8s.V1Pod | None: """Convert to pod from obj.""" @@ -210,57 +169,11 @@ def from_obj(obj) -> dict | k8s.V1Pod | None: if isinstance(k8s_object, k8s.V1Pod): return k8s_object - elif isinstance(k8s_legacy_object, dict): - warnings.warn( - "Using a dictionary for the executor_config is deprecated and will soon be removed. " - 'Please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key' - " instead. ", - category=AirflowProviderDeprecationWarning, - stacklevel=2, - ) - return PodGenerator.from_legacy_obj(obj) else: raise TypeError( "Cannot convert a non-kubernetes.client.models.V1Pod object into a KubernetesExecutorConfig" ) - @staticmethod - def from_legacy_obj(obj) -> k8s.V1Pod | None: - """Convert to pod from obj.""" - if obj is None: - return None - - # We do not want to extract constant here from ExecutorLoader because it is just - # A name in dictionary rather than executor selection mechanism and it causes cyclic import - namespaced = obj.get("KubernetesExecutor", {}) - - if not namespaced: - return None - - resources = namespaced.get("resources") - - if resources is None: - requests = { - "cpu": namespaced.pop("request_cpu", None), - "memory": namespaced.pop("request_memory", None), - "ephemeral-storage": namespaced.get("ephemeral-storage"), # We pop this one in limits - } - limits = { - "cpu": namespaced.pop("limit_cpu", None), - "memory": namespaced.pop("limit_memory", None), - "ephemeral-storage": namespaced.pop("ephemeral-storage", None), - } - all_resources = list(requests.values()) + list(limits.values()) - if all(r is None for r in all_resources): - resources = None - else: - # remove None's so they don't become 0's - requests = {k: v for k, v in requests.items() if v is not None} - limits = {k: v for k, v in limits.items() if v is not None} - resources = k8s.V1ResourceRequirements(requests=requests, limits=limits) - namespaced["resources"] = resources - return PodGeneratorDeprecated(**namespaced).gen_pod() - @staticmethod def reconcile_pods(base_pod: k8s.V1Pod, client_pod: k8s.V1Pod | None) -> k8s.V1Pod: """ @@ -579,37 +492,6 @@ def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod: api_client = ApiClient() return api_client._ApiClient__deserialize_model(pod_dict, k8s.V1Pod) - @staticmethod - @deprecated( - reason="This method is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.", - category=AirflowProviderDeprecationWarning, - ) - def make_unique_pod_id(pod_id: str) -> str | None: - r""" - Generate a unique Pod name. - - Kubernetes pod names must consist of one or more lowercase - rfc1035/rfc1123 labels separated by '.' with a maximum length of 253 - characters. - - Name must pass the following regex for validation - ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` - - For more details, see: - https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md - - :param pod_id: requested pod name - :return: ``str`` valid Pod name of appropriate length - """ - if not pod_id: - return None - - max_pod_id_len = 100 # arbitrarily chosen - suffix = rand_str(8) # 8 seems good enough - base_pod_id_len = max_pod_id_len - len(suffix) - 1 # -1 for separator - trimmed_pod_id = pod_id[:base_pod_id_len].rstrip("-.") - return f"{trimmed_pod_id}-{suffix}" - def merge_objects(base_obj, client_obj): """ diff --git a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py b/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py deleted file mode 100644 index e93be85e368d..000000000000 --- a/providers/src/airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py +++ /dev/null @@ -1,320 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Launches pods.""" - -from __future__ import annotations - -import json -import math -import time -import warnings -from typing import TYPE_CHECKING, cast - -import pendulum -import tenacity -from kubernetes import client, watch -from kubernetes.client.rest import ApiException -from kubernetes.stream import stream as kubernetes_stream -from requests.exceptions import HTTPError - -from airflow.exceptions import AirflowException, RemovedInAirflow3Warning -from airflow.providers.cncf.kubernetes.kube_client import get_kube_client -from airflow.providers.cncf.kubernetes.pod_generator import PodDefaultsDeprecated -from airflow.settings import pod_mutation_hook -from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State - -if TYPE_CHECKING: - from kubernetes.client.models.v1_pod import V1Pod - -warnings.warn( - """ - Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_manager` - - To use this module install the provider package by installing this pip package: - - https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/ - - """, - RemovedInAirflow3Warning, - stacklevel=2, -) - - -class PodStatus: - """Status of the pods.""" - - PENDING = "pending" - RUNNING = "running" - FAILED = "failed" - SUCCEEDED = "succeeded" - - -class PodLauncher(LoggingMixin): - """ - Deprecated class for launching pods. - - Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead. - """ - - def __init__( - self, - kube_client: client.CoreV1Api = None, - in_cluster: bool = True, - cluster_context: str | None = None, - extract_xcom: bool = False, - ): - """ - Launch pods; DEPRECATED. - - Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager - instead to create the launcher. - - :param kube_client: kubernetes client - :param in_cluster: whether we are in cluster - :param cluster_context: context of the cluster - :param extract_xcom: whether we should extract xcom - """ - super().__init__() - self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) - self._watch = watch.Watch() - self.extract_xcom = extract_xcom - - def run_pod_async(self, pod: V1Pod, **kwargs): - """Run pod asynchronously.""" - pod_mutation_hook(pod) - - sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) - json_pod = json.dumps(sanitized_pod, indent=2) - - self.log.debug("Pod Creation Request: \n%s", json_pod) - try: - resp = self._client.create_namespaced_pod( - body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs - ) - self.log.debug("Pod Creation Response: %s", resp) - except Exception as e: - self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod) - raise e - return resp - - def delete_pod(self, pod: V1Pod): - """Delete pod.""" - try: - self._client.delete_namespaced_pod( - pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions() - ) - except ApiException as e: - # If the pod is already deleted - if str(e.status) != "404": - raise - - def start_pod(self, pod: V1Pod, startup_timeout: int = 120): - """ - Launch the pod synchronously and wait for completion. - - :param pod: - :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) - :return: - """ - resp = self.run_pod_async(pod) - start_time = time.monotonic() - if resp.status.start_time is None: - while self.pod_not_started(pod): - self.log.warning("Pod not yet started: %s", pod.metadata.name) - if time.monotonic() >= start_time + startup_timeout: - raise AirflowException("Pod took too long to start") - time.sleep(1) - - def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]: - """ - Monitor a pod and return the final state. - - :param pod: pod spec that will be monitored - :param get_logs: whether to read the logs locally - """ - if get_logs: - read_logs_since_sec = None - last_log_time: pendulum.DateTime | None = None - while True: - logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec) - for line in logs: - timestamp, message = self.parse_log_line(line.decode("utf-8")) - if timestamp: - last_log_time = cast(pendulum.DateTime, pendulum.parse(timestamp)) - self.log.info(message) - time.sleep(1) - - if not self.base_container_is_running(pod): - break - - self.log.warning("Pod %s log read interrupted", pod.metadata.name) - if last_log_time: - delta = pendulum.now() - last_log_time - # Prefer logs duplication rather than loss - read_logs_since_sec = math.ceil(delta.total_seconds()) - result = None - if self.extract_xcom: - while self.base_container_is_running(pod): - self.log.info("Container %s has state %s", pod.metadata.name, State.RUNNING) - time.sleep(2) - result = self._extract_xcom(pod) - self.log.info(result) - result = json.loads(result) - while self.pod_is_running(pod): - self.log.info("Pod %s has state %s", pod.metadata.name, State.RUNNING) - time.sleep(2) - return self._task_status(self.read_pod(pod)), result - - def parse_log_line(self, line: str) -> tuple[str | None, str]: - """ - Parse K8s log line and returns the final state. - - :param line: k8s log line - :return: timestamp and log message - """ - timestamp, sep, message = line.strip().partition(" ") - if not sep: - self.log.error( - "Error parsing timestamp (no timestamp in message: %r). " - "Will continue execution but won't update timestamp", - line, - ) - return None, line - return timestamp, message - - def _task_status(self, event): - self.log.info("Event: %s had an event of type %s", event.metadata.name, event.status.phase) - status = self.process_status(event.metadata.name, event.status.phase) - return status - - def pod_not_started(self, pod: V1Pod): - """Test if pod has not started.""" - state = self._task_status(self.read_pod(pod)) - return state == State.QUEUED - - def pod_is_running(self, pod: V1Pod): - """Test if pod is running.""" - state = self._task_status(self.read_pod(pod)) - return state not in (State.SUCCESS, State.FAILED) - - def base_container_is_running(self, pod: V1Pod): - """Test if base container is running.""" - event = self.read_pod(pod) - status = next((s for s in event.status.container_statuses if s.name == "base"), None) - if not status: - return False - return status.state.running is not None - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod_logs( - self, - pod: V1Pod, - tail_lines: int | None = None, - timestamps: bool = False, - since_seconds: int | None = None, - ): - """Read log from the pod.""" - additional_kwargs = {} - if since_seconds: - additional_kwargs["since_seconds"] = since_seconds - - if tail_lines: - additional_kwargs["tail_lines"] = tail_lines - - try: - return self._client.read_namespaced_pod_log( - name=pod.metadata.name, - namespace=pod.metadata.namespace, - container="base", - follow=True, - timestamps=timestamps, - _preload_content=False, - **additional_kwargs, - ) - except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod_events(self, pod): - """Read events from the pod.""" - try: - return self._client.list_namespaced_event( - namespace=pod.metadata.namespace, field_selector=f"involvedObject.name={pod.metadata.name}" - ) - except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") - - @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True) - def read_pod(self, pod: V1Pod): - """Read pod information.""" - try: - return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) - except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") - - def _extract_xcom(self, pod: V1Pod): - resp = kubernetes_stream( - self._client.connect_get_namespaced_pod_exec, - pod.metadata.name, - pod.metadata.namespace, - container=PodDefaultsDeprecated.SIDECAR_CONTAINER_NAME, - command=["/bin/sh"], - stdin=True, - stdout=True, - stderr=True, - tty=False, - _preload_content=False, - ) - try: - result = self._exec_pod_command(resp, f"cat {PodDefaultsDeprecated.XCOM_MOUNT_PATH}/return.json") - self._exec_pod_command(resp, "kill -s SIGINT 1") - finally: - resp.close() - if result is None: - raise AirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}") - return result - - def _exec_pod_command(self, resp, command): - if resp.is_open(): - self.log.info("Running command... %s\n", command) - resp.write_stdin(command + "\n") - while resp.is_open(): - resp.update(timeout=1) - if resp.peek_stdout(): - return resp.read_stdout() - if resp.peek_stderr(): - self.log.info(resp.read_stderr()) - break - return None - - def process_status(self, job_id, status): - """Process status information for the job.""" - status = status.lower() - if status == PodStatus.PENDING: - return State.QUEUED - elif status == PodStatus.FAILED: - self.log.error("Event with job id %s Failed", job_id) - return State.FAILED - elif status == PodStatus.SUCCEEDED: - self.log.info("Event with job id %s Succeeded", job_id) - return State.SUCCESS - elif status == PodStatus.RUNNING: - return State.RUNNING - else: - self.log.error("Event: Invalid state %s on job %s", status, job_id) - return State.FAILED diff --git a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py b/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py deleted file mode 100644 index 17ddc9aee8fa..000000000000 --- a/providers/src/airflow/providers/cncf/kubernetes/triggers/kubernetes_pod.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""This module is deprecated. Please use :mod:`airflow.providers.cncf.kubernetes.triggers.pod` instead.""" - -from __future__ import annotations - -import warnings - -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.cncf.kubernetes.triggers.pod import * # noqa: F403 - -warnings.warn( - "This module is deprecated. Please use `airflow.providers.cncf.kubernetes.triggers.pod` instead.", - AirflowProviderDeprecationWarning, - stacklevel=2, -) diff --git a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 102dc438006b..51b86647f36b 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -19,12 +19,10 @@ import asyncio import datetime import traceback -import warnings from enum import Enum from functools import cached_property from typing import TYPE_CHECKING, Any, AsyncIterator -from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( OnFinishAction, @@ -71,10 +69,6 @@ class KubernetesPodTrigger(BaseTrigger): :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. - :param should_delete_pod: What to do when the pod reaches its final - state, or the execution is interrupted. If True (default), delete the - pod; if False, leave the pod. - Deprecated - use `on_finish_action` instead. :param logging_interval: number of seconds to wait before kicking it back to the operator to print latest logs. If ``None`` will wait until container done. :param last_log_time: where to resume logs from @@ -95,7 +89,6 @@ def __init__( startup_timeout: int = 120, startup_check_interval: int = 5, on_finish_action: str = "delete_pod", - should_delete_pod: bool | None = None, last_log_time: DateTime | None = None, logging_interval: int | None = None, ): @@ -114,20 +107,7 @@ def __init__( self.startup_check_interval = startup_check_interval self.last_log_time = last_log_time self.logging_interval = logging_interval - - if should_delete_pod is not None: - warnings.warn( - "`should_delete_pod` parameter is deprecated, please use `on_finish_action`", - category=AirflowProviderDeprecationWarning, - stacklevel=2, - ) - self.on_finish_action = ( - OnFinishAction.DELETE_POD if should_delete_pod else OnFinishAction.KEEP_POD - ) - self.should_delete_pod = should_delete_pod - else: - self.on_finish_action = OnFinishAction(on_finish_action) - self.should_delete_pod = self.on_finish_action == OnFinishAction.DELETE_POD + self.on_finish_action = OnFinishAction(on_finish_action) self._since_time = None @@ -148,7 +128,6 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "startup_timeout": self.startup_timeout, "startup_check_interval": self.startup_check_interval, "trigger_start_time": self.trigger_start_time, - "should_delete_pod": self.should_delete_pod, "on_finish_action": self.on_finish_action.value, "last_log_time": self.last_log_time, "logging_interval": self.logging_interval, diff --git a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index cd91dc09281f..aa8e812921c3 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -26,11 +26,10 @@ from contextlib import closing, suppress from dataclasses import dataclass from datetime import timedelta -from typing import TYPE_CHECKING, Callable, Generator, Protocol, cast +from typing import TYPE_CHECKING, Generator, Protocol, cast import pendulum import tenacity -from deprecated import deprecated from kubernetes import client, watch from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream @@ -39,7 +38,7 @@ from typing_extensions import Literal from urllib3.exceptions import HTTPError, TimeoutError -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils.log.logging_mixin import LoggingMixin @@ -302,19 +301,15 @@ def __init__( self, kube_client: client.CoreV1Api, callbacks: type[KubernetesPodOperatorCallback] | None = None, - progress_callback: Callable[[str], None] | None = None, ): """ Create the launcher. :param kube_client: kubernetes client :param callbacks: - :param progress_callback: Callback function invoked when fetching container log. - This parameter is deprecated, please use ```` """ super().__init__() self._client = kube_client - self._progress_callback = progress_callback self._watch = watch.Watch() self._callbacks = callbacks @@ -383,16 +378,6 @@ def await_pod_start( raise PodLaunchFailedException(msg) time.sleep(startup_check_interval) - @deprecated( - reason=( - "Method `follow_container_logs` is deprecated. Use `fetch_container_logs` instead " - "with option `follow=True`." - ), - category=AirflowProviderDeprecationWarning, - ) - def follow_container_logs(self, pod: V1Pod, container_name: str) -> PodLoggingStatus: - return self.fetch_container_logs(pod=pod, container_name=container_name, follow=True) - def fetch_container_logs( self, pod: V1Pod, @@ -461,8 +446,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None progress_callback_lines.append(line) else: # previous log line is complete for line in progress_callback_lines: - if self._progress_callback: - self._progress_callback(line) if self._callbacks: self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC @@ -479,8 +462,6 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None finally: # log the last line and update the last_captured_timestamp for line in progress_callback_lines: - if self._progress_callback: - self._progress_callback(line) if self._callbacks: self._callbacks.progress_callback( line=line, client=self._client, mode=ExecutionMode.SYNC diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index ea143edd8298..f5db8b806af4 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -28,7 +28,7 @@ from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse -from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException from airflow.executors.executor_constants import ( CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR, @@ -52,12 +52,12 @@ get_base_pod_from_template, ) from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( + add_unique_suffix, annotations_for_logging_task_metadata, annotations_to_key, create_unique_id, get_logs_task_metadata, ) -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils import timezone from airflow.utils.state import State, TaskInstanceState @@ -106,8 +106,7 @@ def _is_safe_label_value(value): def test_create_pod_id(self): for dag_id, task_id in self._cases(): - with pytest.warns(AirflowProviderDeprecationWarning, match=r"deprecated\. Use `add_pod_suffix`"): - pod_name = PodGenerator.make_unique_pod_id(create_unique_id(dag_id, task_id)) + pod_name = add_unique_suffix(name=create_unique_id(dag_id, task_id)) assert self._is_valid_pod_id(pod_name), f"dag_id={dag_id!r}, task_id={task_id!r}" @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.PodGenerator") diff --git a/providers/tests/cncf/kubernetes/models/test_secret.py b/providers/tests/cncf/kubernetes/models/test_secret.py index 51fd17ca71f5..1f2995296875 100644 --- a/providers/tests/cncf/kubernetes/models/test_secret.py +++ b/providers/tests/cncf/kubernetes/models/test_secret.py @@ -62,12 +62,9 @@ def test_only_mount_sub_secret(self, mock_uuid): ) @mock.patch("uuid.uuid4") - @mock.patch("airflow.providers.cncf.kubernetes.pod_generator.rand_str") - def test_attach_to_pod(self, mock_rand_str, mock_uuid, data_file): + def test_attach_to_pod(self, mock_uuid, data_file): static_uuid = uuid.UUID("cf4a56d2-8101-4217-b027-2af6216feb48") mock_uuid.return_value = static_uuid - rand_str = "abcd1234" - mock_rand_str.return_value = rand_str template_file = data_file("pods/generator_base.yaml").as_posix() pod = PodGenerator(pod_template_file=template_file).ud_pod secrets = [ diff --git a/providers/tests/cncf/kubernetes/operators/test_pod.py b/providers/tests/cncf/kubernetes/operators/test_pod.py index a39b1c6d8893..d6b7378c80c8 100644 --- a/providers/tests/cncf/kubernetes/operators/test_pod.py +++ b/providers/tests/cncf/kubernetes/operators/test_pod.py @@ -31,7 +31,6 @@ from airflow.exceptions import ( AirflowException, - AirflowProviderDeprecationWarning, AirflowSkipException, TaskDeferred, ) @@ -1305,31 +1304,6 @@ def test_wait_for_xcom_sidecar_iff_push_xcom(self, mock_await, mock_extract_xcom else: mock_await.assert_not_called() - @pytest.mark.parametrize( - "on_finish_action", - # Regardless what we provide in `on_finish_action` - # it doesn't take any affect if `is_delete_operator_pod` provided. - [*sorted(OnFinishAction.__members__.values()), None], - ) - @pytest.mark.parametrize( - "is_delete_operator_pod, expected_on_finish_action", - [ - pytest.param(True, "delete_pod", id="delete-operator-pod"), - pytest.param(False, "keep_pod", id="keep-operator-pod"), - ], - ) - def test_deprecated_is_delete_operator_pod( - self, is_delete_operator_pod, expected_on_finish_action, on_finish_action - ): - with pytest.warns(AirflowProviderDeprecationWarning, match="please use `on_finish_action`"): - op = KubernetesPodOperator( - task_id="task", - is_delete_operator_pod=is_delete_operator_pod, - on_finish_action=on_finish_action, - ) - assert op.is_delete_operator_pod == is_delete_operator_pod - assert op.on_finish_action == expected_on_finish_action - @pytest.mark.parametrize( "task_kwargs, should_fail, should_be_deleted", [ @@ -2272,15 +2246,6 @@ def test_trigger_error(self, find_pod, cleanup, mock_write_log): }, ) - def test_deprecated_execute_complete(self): - fake_context = mock.sentinel.context - fake_event = mock.sentinel.event - with mock.patch.object(KubernetesPodOperator, "trigger_reentry") as mocked_trigger_reentry: - op = KubernetesPodOperator(task_id="test-task") - with pytest.warns(AirflowProviderDeprecationWarning, match="use `trigger_reentry` instead"): - op.execute_complete(fake_context, fake_event) - mocked_trigger_reentry.assert_called_once_with(context=fake_context, event=fake_event) - @pytest.mark.parametrize("do_xcom_push", [True, False]) @patch(KUB_OP_PATH.format("extract_xcom")) diff --git a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py index bfe4f98f3e14..5dce063d572c 100644 --- a/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py +++ b/providers/tests/cncf/kubernetes/test_kubernetes_helper_functions.py @@ -18,12 +18,10 @@ from __future__ import annotations import re -from unittest import mock import pytest -from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id, create_unique_id +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id pod_name_regex = r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" @@ -110,20 +108,3 @@ def test_create_pod_id(self, length, unique): assert re.match(r"-[a-z0-9]{8}", actual[-9:]) else: assert actual == base[:length] - - @pytest.mark.parametrize("dag_id", ["fake-dag", None]) - @pytest.mark.parametrize("task_id", ["fake-task", None]) - @pytest.mark.parametrize("max_length", [10, 42, None]) - @pytest.mark.parametrize("unique", [True, False]) - def test_back_compat_create_pod_id(self, dag_id, task_id, max_length, unique): - with mock.patch( - "airflow.providers.cncf.kubernetes.kubernetes_helper_functions.create_unique_id" - ) as mocked_create_unique_id: - with pytest.warns( - AirflowProviderDeprecationWarning, match=r"deprecated. Please use `create_unique_id`" - ): - create_pod_id(dag_id, task_id, max_length=max_length, unique=unique) - - mocked_create_unique_id.assert_called_once_with( - dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique - ) diff --git a/providers/tests/cncf/kubernetes/test_pod_generator.py b/providers/tests/cncf/kubernetes/test_pod_generator.py index 1a18693852a3..30d1df2a0d73 100644 --- a/providers/tests/cncf/kubernetes/test_pod_generator.py +++ b/providers/tests/cncf/kubernetes/test_pod_generator.py @@ -26,10 +26,10 @@ from kubernetes.client import ApiClient, models as k8s from airflow import __version__ -from airflow.exceptions import AirflowConfigException, AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowConfigException from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import PodReconciliationError +from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import add_unique_suffix from airflow.providers.cncf.kubernetes.pod_generator import ( - PodDefaultsDeprecated, PodGenerator, datetime_to_label_safe_datestring, extend_object_field, @@ -160,41 +160,6 @@ def setup_method(self): ), ) - @mock.patch("airflow.providers.cncf.kubernetes.kubernetes_helper_functions.rand_str") - def test_gen_pod_extract_xcom(self, mock_rand_str, data_file): - """ - Method gen_pod is used nowhere in codebase and is deprecated. - This test is only retained for backcompat. - """ - mock_rand_str.return_value = self.rand_str - template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix() - - pod_generator = PodGenerator(pod_template_file=template_file, extract_xcom=True) - with pytest.warns(AirflowProviderDeprecationWarning): - result = pod_generator.gen_pod() - container_two = { - "name": "airflow-xcom-sidecar", - "image": "alpine", - "command": ["sh", "-c", PodDefaultsDeprecated.XCOM_CMD], - "volumeMounts": [{"name": "xcom", "mountPath": "/airflow/xcom"}], - "resources": {"requests": {"cpu": "1m"}}, - } - self.expected.spec.containers.append(container_two) - base_container: k8s.V1Container = self.expected.spec.containers[0] - base_container.volume_mounts = base_container.volume_mounts or [] - base_container.volume_mounts.append(k8s.V1VolumeMount(name="xcom", mount_path="/airflow/xcom")) - self.expected.spec.containers[0] = base_container - self.expected.spec.volumes = self.expected.spec.volumes or [] - self.expected.spec.volumes.append( - k8s.V1Volume( - name="xcom", - empty_dir={}, - ) - ) - result_dict = self.k8s_client.sanitize_for_serialization(result) - expected_dict = self.k8s_client.sanitize_for_serialization(self.expected) - assert result_dict == expected_dict - def test_from_obj_pod_override_object(self): obj = { "pod_override": k8s.V1Pod( @@ -240,54 +205,6 @@ def test_from_obj_pod_override_object(self): }, } - def test_from_obj_legacy(self): - obj = { - "KubernetesExecutor": { - "annotations": {"test": "annotation"}, - "volumes": [ - { - "name": "example-kubernetes-test-volume", - "hostPath": {"path": "/tmp/"}, - }, - ], - "volume_mounts": [ - { - "mountPath": "/foo/", - "name": "example-kubernetes-test-volume", - }, - ], - } - } - with pytest.warns( - AirflowProviderDeprecationWarning, - match="Using a dictionary for the executor_config is deprecated and will soon be removed", - ): - result = PodGenerator.from_obj(obj) - - assert self.k8s_client.sanitize_for_serialization(result) == { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "annotations": {"test": "annotation"}, - }, - "spec": { - "containers": [ - { - "args": [], - "command": [], - "env": [], - "envFrom": [], - "name": "base", - "ports": [], - "volumeMounts": [{"mountPath": "/foo/", "name": "example-kubernetes-test-volume"}], - } - ], - "hostNetwork": False, - "imagePullSecrets": [], - "volumes": [{"hostPath": {"path": "/tmp/"}, "name": "example-kubernetes-test-volume"}], - }, - } - def test_from_obj_both(self): obj = { "pod_override": k8s.V1Pod( @@ -725,16 +642,13 @@ def test_deserialize_non_existent_model_file(self, caplog, tmp_path): ), ) def test_pod_name_confirm_to_max_length(self, input): - with pytest.warns( - AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in `kubernetes_helper_functions`" - ): - actual = PodGenerator.make_unique_pod_id(input) - assert len(actual) <= 100 + actual = add_unique_suffix(name=input) + assert len(actual) <= 63 actual_base, actual_suffix = actual.rsplit("-", maxsplit=1) # we limit pod id length to 100 # random suffix is 8 chars plus the '-' separator - # so actual pod id base should first 91 chars of requested pod id - assert actual_base == input[:91] + # so actual pod id base should first 55 chars of requested pod id + assert actual_base == input[:54] # suffix should always be 8, the random alphanum assert re.match(r"^[a-z0-9]{8}$", actual_suffix) @@ -743,7 +657,7 @@ def test_pod_name_confirm_to_max_length(self, input): ( ( "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen-", - "somewhat-long-pod-name-maybe-longer-than-previously-supported-with-hyphen", + "somewhat-long-pod-name-maybe-longer-than-previously-su", ), ("pod-name-with-hyphen-", "pod-name-with-hyphen"), ("pod-name-with-double-hyphen--", "pod-name-with-double-hyphen"), @@ -759,10 +673,7 @@ def test_pod_name_is_valid(self, pod_id, expected_starts_with): `make_unique_pod_id` doesn't actually guarantee that the regex passes for any input. But I guess this test verifies that an otherwise valid pod_id doesn't get _screwed up_. """ - with pytest.warns( - AirflowProviderDeprecationWarning, match="Use `add_pod_suffix` in `kubernetes_helper_functions`" - ): - actual = PodGenerator.make_unique_pod_id(pod_id) + actual = add_unique_suffix(name=pod_id) assert len(actual) <= 253 assert actual == actual.lower(), "not lowercase" # verify using official k8s regex diff --git a/providers/tests/cncf/kubernetes/triggers/test_pod.py b/providers/tests/cncf/kubernetes/triggers/test_pod.py index 0e7522480a86..9b3a21d023f1 100644 --- a/providers/tests/cncf/kubernetes/triggers/test_pod.py +++ b/providers/tests/cncf/kubernetes/triggers/test_pod.py @@ -108,7 +108,6 @@ def test_serialize(self, trigger): "startup_check_interval": 5, "trigger_start_time": TRIGGER_START_TIME, "on_finish_action": ON_FINISH_ACTION, - "should_delete_pod": ON_FINISH_ACTION == "delete_pod", "last_log_time": None, "logging_interval": None, } diff --git a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py index b577ea969ea3..d220befc2191 100644 --- a/providers/tests/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/tests/cncf/kubernetes/utils/test_pod_manager.py @@ -49,12 +49,10 @@ class TestPodManager: def setup_method(self): - self.mock_progress_callback = mock.Mock() self.mock_kube_client = mock.Mock() self.pod_manager = PodManager( kube_client=self.mock_kube_client, callbacks=MockKubernetesPodOperatorCallback, - progress_callback=self.mock_progress_callback, ) def test_read_pod_logs_successfully_returns_logs(self): @@ -294,19 +292,6 @@ def test_fetch_container_logs_returning_last_timestamp( assert status.last_log_time == cast("DateTime", pendulum.parse(timestamp_string)) - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") - def test_fetch_container_logs_invoke_deprecated_progress_callback( - self, mock_read_pod_logs, mock_container_is_running - ): - message = "2020-10-08T14:16:17.793417674Z message" - no_ts_message = "notimestamp" - mock_read_pod_logs.return_value = [bytes(message, "utf-8"), bytes(no_ts_message, "utf-8")] - mock_container_is_running.return_value = False - - self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) - self.mock_progress_callback.assert_has_calls([mock.call(message), mock.call(no_ts_message)]) - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") def test_fetch_container_logs_invoke_progress_callback( @@ -352,7 +337,6 @@ def consumer_iter(): mock_container_is_running.side_effect = [True, True, False] status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) assert status.last_log_time == cast("DateTime", pendulum.parse(last_timestamp_string)) - assert self.mock_progress_callback.call_count == expected_call_count assert mock_callbacks.progress_callback.call_count == expected_call_count @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") diff --git a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py index 844e2f96332b..076ce1837102 100644 --- a/providers/tests/google/cloud/triggers/test_kubernetes_engine.py +++ b/providers/tests/google/cloud/triggers/test_kubernetes_engine.py @@ -27,7 +27,13 @@ from google.cloud.container_v1.types import Operation from kubernetes.client import models as k8s -from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import ContainerState +try: + from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState +except ImportError: + # preserve backward compatibility for older versions of cncf.kubernetes provider, remove this when minimum cncf.kubernetes provider is 10.0 + from airflow.providers.cncf.kubernetes.triggers.kubernetes_pod import ( # type: ignore[no-redef] + ContainerState, + ) from airflow.providers.google.cloud.triggers.kubernetes_engine import ( GKEJobTrigger, GKEOperationTrigger, diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index ab5ebcac697b..7d5756b2e7ef 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -48,8 +48,6 @@ DEPRECATED_MODULES = [ "airflow.providers.apache.hdfs.sensors.hdfs", "airflow.providers.apache.hdfs.hooks.hdfs", - "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod", - "airflow.providers.cncf.kubernetes.operators.kubernetes_pod", "airflow.providers.tabular.hooks.tabular", "airflow.providers.yandex.hooks.yandexcloud_dataproc", "airflow.providers.yandex.operators.yandexcloud_dataproc", diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 683fa7ce797c..78534accd18b 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -555,10 +555,6 @@ class TestElasticsearchProviderProjectStructure(ExampleCoverageTest): class TestCncfProviderProjectStructure(ExampleCoverageTest): PROVIDER = "cncf" CLASS_DIRS = ProjectStructureTest.CLASS_DIRS - DEPRECATED_CLASSES = { - "airflow.providers.cncf.kubernetes.operators.kubernetes_pod", - "airflow.providers.cncf.kubernetes.triggers.kubernetes_pod", - } BASE_CLASSES = {"airflow.providers.cncf.kubernetes.operators.resource.KubernetesResourceBaseOperator"}