Skip to content

Commit

Permalink
Remove deprecations cncf.kubernetes (#43689)
Browse files Browse the repository at this point in the history
* change default namespace value to be None

* revert ui change

* remove deprecations from cncf.kubernetes

* add changelog

* add list of changes in changelog

* Update providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst

* fix tests

* fix tests

* update changelog, remove deprecation warnings from tests

* add todo comment for removing import

* remove kubernetes_pod as deprecated module

* remove unecessary test, add import for deprecation

* fix tests

* fix tests and imports

* fix tests

* add ignore re-def in correct place

---------

Co-authored-by: Elad Kalif <[email protected]>
  • Loading branch information
romsharon98 and eladkal authored Nov 9, 2024
1 parent b823f94 commit 6d85a04
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 822 deletions.
4 changes: 2 additions & 2 deletions airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
Expand Down Expand Up @@ -59,7 +59,7 @@ def generate_pod_yaml(args):
pod = PodGenerator.construct_pod(
dag_id=args.dag_id,
task_id=ti.task_id,
pod_id=create_pod_id(args.dag_id, ti.task_id),
pod_id=create_unique_id(args.dag_id, ti.task_id),
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Read more on termination-log `here <https://kubernetes.io/docs/tasks/debug/debug
KubernetesPodOperator callbacks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` supports different
The :class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` supports different
callbacks that can be used to trigger actions during the lifecycle of the pod. In order to use them, you need to
create a subclass of :class:`~airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback` and override
the callbacks methods you want to use. Then you can pass your callback class to the operator using the ``callbacks``
Expand Down
18 changes: 0 additions & 18 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,24 +1201,6 @@ def test_changing_base_container_name_with_get_logs(self, mock_get_connection):
self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
assert self.expected_pod["spec"] == actual_pod["spec"]

def test_progess_call(self, mock_get_connection):
progress_callback = MagicMock()
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels=self.labels,
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
get_logs=True,
progress_callback=progress_callback,
)
context = create_context(k)
k.execute(context)
progress_callback.assert_called()

def test_changing_base_container_name_no_logs(self, mock_get_connection):
"""
This test checks BOTH a modified base container name AND the get_logs=False flow,
Expand Down
6 changes: 4 additions & 2 deletions providers/src/airflow/providers/amazon/aws/operators/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
try:
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
except ImportError:
# preserve backward compatibility for older versions of cncf.kubernetes provider
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
# preserve backward compatibility for older versions of cncf.kubernetes provider, remove this when minimum cncf.kubernetes provider is 10.0
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( # type: ignore[no-redef]
KubernetesPodOperator,
)

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
28 changes: 28 additions & 0 deletions providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ Changelog
main
.....

All deprecated classes, parameters and features have been removed from the Kubernetes provider package.
The following breaking changes were introduced:

* Helpers
* Remove ``add_pod_suffix`` method from ``kubernetes_helper_functions.py``. Use ``add_unique_suffix`` instead.
* Remove ``make_unique_pod_id`` method from ``PodGenerator``. Use ``add_unique_suffix`` in ``kubernetes_helper_functions`` instead.
* Remove ``create_pod_id`` method from ``kubernetes_helper_functions.py``. Use ``create_unique_id`` instead.
* Remove ``gen_pod`` method from ``PodGenerator``.
* Remove ``add_xcom_sidecar`` method from ``PodGenerator``. Use ``airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar`` instead.
* Remove the option to using a dictionary for the executor_config ``from_obj`` function in ``PodGenerator``. Use a ``kubernetes.client.models.V1Pod`` class with a "pod_override" key.
* Remove ``from_legacy_obj`` method from ``PodGenerator``.
* Remove ``airflow.providers.cncf.kubernetes.pod_launcher_deprecated`` module. Use ``airflow.providers.cncf.kubernetes.utils.pod_manager`` instead.

* Operators
* Remove ``airflow.providers.cncf.kubernetes.operators.kubernetes_pod``. Use ``airflow.providers.cncf.kubernetes.operators.pod`` instead.
* Remove ``is_delete_operator_pod`` parameters from ``KubernetesPodOperator``. Use ``on_finish_action`` instead.
* Remove ``progress_callback`` parameters from ``KubernetesPodOperator``. Use ``callbacks`` instead.
* Remove ``execute_complete`` method from ``KubernetesPodOperator``. Use ``trigger_reentry`` instead.
* Remove ``xcom_push`` parameter from ``SparkKubernetesOperator``. Use ``do_xcom_push``.

* Triggers
* Remove ``should_delete_pod`` parameter from ``KubernetesPodTrigger``. Use ``on_finish_action`` instead.

* Utils
* Remove ``progress_callback`` parameter from ``PodManager``.
* Remove ``follow_container_logs`` method from ``PodManager``. Use ``fetch_container_logs`` instead.


.. warning::
Set the default value of ``namespace`` in ``@task.kubernetes`` to ``None``, so it uses the cluster namespace when ``in_cluster`` is True. Be sure to specify a namespace when using this decorator. To retain the previous behavior, set ``namespace="default"``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
from typing import TYPE_CHECKING

import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down Expand Up @@ -62,22 +60,6 @@ def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_M
return name[: max_len - len(suffix)].strip("-.") + suffix


@deprecated(
reason="This function is deprecated. Please use `add_unique_suffix`",
category=AirflowProviderDeprecationWarning,
)
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
"""
Add random string to pod name while staying under max length.
:param pod_name: name of the pod
:param rand_len: length of the random string to append
:param max_len: maximum length of the pod name
:meta private:
"""
return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)


def create_unique_id(
dag_id: str | None = None,
task_id: str | None = None,
Expand Down Expand Up @@ -110,29 +92,6 @@ def create_unique_id(
return base_name


@deprecated(
reason="This function is deprecated. Please use `create_unique_id`.",
category=AirflowProviderDeprecationWarning,
)
def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
*,
max_length: int = POD_NAME_MAX_LENGTH,
unique: bool = True,
) -> str:
"""
Generate unique pod ID given a dag_id and / or task_id.
:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
"""
return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique)


def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
"""Build a TaskInstanceKey based on pod annotations."""
log.debug("Creating task key for annotations %s", annotations)
Expand Down

This file was deleted.

32 changes: 3 additions & 29 deletions providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import re
import shlex
import string
import warnings
from collections.abc import Container, Mapping
from contextlib import AbstractContextManager
from enum import Enum
Expand All @@ -35,7 +34,6 @@

import kubernetes
import tenacity
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.client.exceptions import ApiException
from kubernetes.stream import stream
Expand All @@ -44,7 +42,6 @@
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
AirflowSkipException,
TaskDeferred,
)
Expand Down Expand Up @@ -215,18 +212,12 @@ class KubernetesPodOperator(BaseOperator):
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds
in V1PodSpec.
:param callbacks: KubernetesPodOperatorCallback instance contains the callbacks methods on different step
of KubernetesPodOperator.
:param progress_callback: Callback function for receiving k8s container logs.
`progress_callback` is deprecated, please use :param `callbacks` instead.
:param logging_interval: max time in seconds that task should be in deferred state before
resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod
is done, and no logs will be visible until that time.
Expand Down Expand Up @@ -404,19 +395,8 @@ def __init__(
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
self.log_pod_spec_on_failure = log_pod_spec_on_failure
if is_delete_operator_pod is not None:
warnings.warn(
"`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.on_finish_action = (
OnFinishAction.DELETE_POD if is_delete_operator_pod else OnFinishAction.KEEP_POD
)
self.is_delete_operator_pod = is_delete_operator_pod
else:
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
self.active_deadline_seconds = active_deadline_seconds
self.logging_interval = logging_interval
Expand Down Expand Up @@ -512,9 +492,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool

@cached_property
def pod_manager(self) -> PodManager:
return PodManager(
kube_client=self.client, callbacks=self.callbacks, progress_callback=self._progress_callback
)
return PodManager(kube_client=self.client, callbacks=self.callbacks)

@cached_property
def hook(self) -> PodOperatorHookProtocol:
Expand Down Expand Up @@ -1161,10 +1139,6 @@ def dry_run(self) -> None:
pod = self.build_pod_request_obj()
print(yaml.dump(prune_dict(pod.to_dict(), mode="strict")))

@deprecated(reason="use `trigger_reentry` instead.", category=AirflowProviderDeprecationWarning)
def execute_complete(self, context: Context, event: dict, **kwargs):
return self.trigger_reentry(context=context, event=event)

def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> k8s.V1Pod:
"""
Patch or delete the existing pod with duplicate labels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ def __init__(
random_name_suffix: bool = True,
**kwargs,
) -> None:
if kwargs.get("xcom_push") is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(name=name, **kwargs)
self.image = image
self.code_path = code_path
Expand Down
Loading

0 comments on commit 6d85a04

Please sign in to comment.