Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move all k8S classes to cncf.kubernetes provider #32767

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

# Kubernetes
/airflow/kubernetes/ @dstandish @jedcunningham
/airflow/kubernetes_executor_templates/ @dstandish @jedcunningham
/airflow/executors/celery_kubernetes_executor.py @dstandish @jedcunningham
/airflow/executors/kubernetes_executor.py @dstandish @jedcunningham

Expand Down
6 changes: 0 additions & 6 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,13 @@ labelPRBasedOnFilePath:
- airflow/**/kubernetes_*.py
- airflow/example_dags/example_kubernetes_executor.py
- airflow/providers/cncf/kubernetes/**/*
- airflow/kubernetes/**/*
- airflow/kubernetes_executor_templates/**/*
- airflow/executors/kubernetes_executor.py
- airflow/providers/celery/executors/celery_kubernetes_executor.py
- docs/apache-airflow/core-concepts/executor/kubernetes.rst
- docs/apache-airflow/core-concepts/executor/celery_kubernetes.rst
- docs/apache-airflow-providers-cncf-kubernetes/**/*
- kubernetes_tests/**/*
- tests/providers/cncf/kubernetes/**/*
- tests/system/providers/cncf/kubernetes/**/*
- tests/kubernetes/**/*
- tests/executors/kubernetes_executor_template_files/**/*
- tests/executors/*kubernetes*.py

area:API:
- airflow/api/**/*
Expand Down
16 changes: 16 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,22 @@ repos:
pass_filenames: false
entry: ./scripts/ci/pre_commit/pre_commit_check_order_setup.py
additional_dependencies: ['rich>=12.4.4']
- id: check-airflow-k8s-not-used
name: Check airflow.kubernetes imports are not used
language: python
files: ^airflow/.*\.py$
require_serial: true
exclude: ^airflow/kubernetes/
entry: ./scripts/ci/pre_commit/pre_commit_check_airflow_k8s_not_used.py
additional_dependencies: ['rich>=12.4.4']
- id: check-cncf-k8s-only-for-executors
name: Check cncf.kubernetes imports used for executors only
language: python
files: ^airflow/.*\.py$
require_serial: true
exclude: ^airflow/kubernetes/|^airflow/providers/
entry: ./scripts/ci/pre_commit/pre_commit_check_cncf_k8s_used_for_k8s_executor_only.py
additional_dependencies: ['rich>=12.4.4']
- id: check-extra-packages-references
name: Checks setup extra packages
description: Checks if all the libraries in setup.py are listed in extra-packages-ref.rst file
Expand Down
4 changes: 4 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-aiobotocore-optional | Check if aiobotocore is an optional dependency only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-airflow-k8s-not-used | Check airflow.kubernetes imports are not used | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-airflow-provider-compatibility | Check compatibility of Providers with Airflow | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-apache-license-rat | Check if licenses are OK for Apache | |
Expand All @@ -163,6 +165,8 @@ require Breeze Docker image to be built locally.
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-changelog-has-no-duplicates | Check changelogs for duplicate entries | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-cncf-k8s-only-for-executors | Check cncf.kubernetes imports used for executors only | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-core-deprecation-classes | Verify usage of Airflow deprecation classes in core | |
+-----------------------------------------------------------+--------------------------------------------------------------+---------+
| check-daysago-import-from-utils | Make sure days_ago is imported from airflow.utils.dates | |
Expand Down
10 changes: 5 additions & 5 deletions airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from airflow.executors.kubernetes_executor import KubeConfig
from airflow.kubernetes import pod_generator
from airflow.kubernetes.kube_client import get_kube_client
from airflow.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.models import DagRun, TaskInstance
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
},
}

add_deprecated_classes(__deprecated_classes, __name__)
add_deprecated_classes(__deprecated_classes, __name__, {}, "The `celery` provider must be >= 3.3.0 for that.")
210 changes: 0 additions & 210 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1943,21 +1943,6 @@ sentry:
type: string
example: ~
default: ~
local_kubernetes_executor:
description: |
This section only applies if you are using the ``LocalKubernetesExecutor`` in
``[core]`` section above
options:
kubernetes_queue:
description: |
Define when to send a task to ``KubernetesExecutor`` when using ``LocalKubernetesExecutor``.
When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
the task is executed via ``KubernetesExecutor``,
otherwise via ``LocalExecutor``
version_added: 2.3.0
type: string
example: ~
default: "kubernetes"
scheduler:
description: ~
options:
Expand Down Expand Up @@ -2442,201 +2427,6 @@ elasticsearch_configs:
type: string
example: ~
default: "True"
kubernetes_executor:
description: ~
renamed:
previous_name: kubernetes
version: 2.5.0
options:
api_client_retry_configuration:
description: |
Kwargs to override the default urllib3 Retry used in the kubernetes API client
version_added: 2.6.0
type: string
example: '{ "total": 3, "backoff_factor": 0.5 }'
default: ""
logs_task_metadata:
description: |
Flag to control the information added to kubernetes executor logs for better traceability
version_added: 2.7.0
type: boolean
example: ~
default: "False"
pod_template_file:
description: |
Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
version_added: 1.10.11
type: string
example: ~
default: ""
see_also: ":ref:`concepts:pod_template_file`"
worker_container_repository:
description: |
The repository of the Kubernetes Image for the Worker to Run
version_added: ~
type: string
example: ~
default: ""
worker_container_tag:
description: |
The tag of the Kubernetes Image for the Worker to Run
version_added: ~
type: string
example: ~
default: ""
namespace:
description: |
The Kubernetes namespace where airflow workers should be created. Defaults to ``default``
version_added: ~
type: string
example: ~
default: "default"
delete_worker_pods:
description: |
If True, all worker pods will be deleted upon termination
version_added: ~
type: string
example: ~
default: "True"
delete_worker_pods_on_failure:
description: |
If False (and delete_worker_pods is True),
failed worker pods will not be deleted so users can investigate them.
This only prevents removal of worker pods where the worker itself failed,
not when the task it ran failed.
version_added: 1.10.11
type: string
example: ~
default: "False"
worker_pods_creation_batch_size:
description: |
Number of Kubernetes Worker Pod creation calls per scheduler loop.
Note that the current default of "1" will only launch a single pod
per-heartbeat. It is HIGHLY recommended that users increase this
number to match the tolerance of their kubernetes cluster for
better performance.
version_added: 1.10.3
type: string
example: ~
default: "1"
multi_namespace_mode:
description: |
Allows users to launch pods in multiple namespaces.
Will require creating a cluster-role for the scheduler,
or use multi_namespace_mode_namespace_list configuration.
version_added: 1.10.12
type: boolean
example: ~
default: "False"
multi_namespace_mode_namespace_list:
description: |
If multi_namespace_mode is True while scheduler does not have a cluster-role,
give the list of namespaces where the scheduler will schedule jobs
Scheduler needs to have the necessary permissions in these namespaces.
version_added: 2.6.0
type: string
example: ~
default: ""
in_cluster:
description: |
Use the service account kubernetes gives to pods to connect to kubernetes cluster.
It's intended for clients that expect to be running inside a pod running on kubernetes.
It will raise an exception if called from a process not running in a kubernetes environment.
version_added: ~
type: string
example: ~
default: "True"
cluster_context:
description: |
When running with in_cluster=False change the default cluster_context or config_file
options to Kubernetes client. Leave blank these to use default behaviour like ``kubectl`` has.
version_added: 1.10.3
type: string
example: ~
default: ~
config_file:
description: |
Path to the kubernetes configfile to be used when ``in_cluster`` is set to False
version_added: 1.10.3
type: string
example: ~
default: ~
kube_client_request_args:
description: |
Keyword parameters to pass while calling a kubernetes client core_v1_api methods
from Kubernetes Executor provided as a single line formatted JSON dictionary string.
List of supported params are similar for all core_v1_apis, hence a single config
variable for all apis. See:
https://raw.githubusercontent.com/kubernetes-client/python/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/api/core_v1_api.py
version_added: 1.10.4
type: string
example: ~
default: ""
delete_option_kwargs:
description: |
Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
``core_v1_api`` method when using the Kubernetes Executor.
This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
class defined here:
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
version_added: 1.10.12
type: string
example: '{"grace_period_seconds": 10}'
default: ""
enable_tcp_keepalive:
description: |
Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely
when idle connection is time-outed on services like cloud load balancers or firewalls.
version_added: 2.0.0
type: boolean
example: ~
default: "True"
tcp_keep_idle:
description: |
When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has
been idle for `tcp_keep_idle` seconds.
version_added: 2.0.0
type: integer
example: ~
default: "120"
tcp_keep_intvl:
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds.
version_added: 2.0.0
type: integer
example: ~
default: "30"
tcp_keep_cnt:
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before
a connection is considered to be broken.
version_added: 2.0.0
type: integer
example: ~
default: "6"
verify_ssl:
description: |
Set this to false to skip verifying SSL certificate of Kubernetes python client.
version_added: 2.1.0
type: boolean
example: ~
default: "True"
worker_pods_queued_check_interval:
description: |
How often in seconds to check for task instances stuck in "queued" status without a pod
version_added: 2.2.0
type: integer
example: ~
default: "60"
ssl_ca_cert:
description: |
Path to a CA certificate to be used by the Kubernetes client to verify the server's SSL certificate.
version_added: 2.6.0
type: string
example: ~
default: ""
sensors:
description: ~
options:
Expand Down
9 changes: 5 additions & 4 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.exceptions import AirflowConfigException
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, BaseSecretsBackend
from airflow.utils import yaml
from airflow.utils.empty_set import _get_empty_set_for_configuration
from airflow.utils.module_loading import import_string
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.weight_rule import WeightRule
Expand All @@ -66,8 +67,6 @@

ENV_VAR_PREFIX = "AIRFLOW__"

EMPTY_SET: Set[tuple[str, str]] = set() # noqa: UP006


def _parse_sqlite_version(s: str) -> tuple[int, ...]:
match = _SQLITE3_VERSION_PATTERN.match(s)
Expand Down Expand Up @@ -299,7 +298,9 @@ def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
@functools.cached_property
def sensitive_config_values(self) -> Set[tuple[str, str]]: # noqa: UP006
if self.configuration_description is None:
return EMPTY_SET.copy() # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
return (
_get_empty_set_for_configuration()
) # we can't use set() here because set is defined below # ¯\_(ツ)_/¯
flattened = {
(s, k): item
for s, s_c in self.configuration_description.items()
Expand Down Expand Up @@ -2313,6 +2314,6 @@ def initialize_auth_manager() -> BaseAuthManager:
FERNET_KEY = "" # Set only if needed when generating a new file
WEBSERVER_CONFIG = "" # Set by initialize_config

conf = initialize_config()
conf: AirflowConfigParser = initialize_config()
secrets_backend_list = initialize_secrets_backends()
conf.validate()
2 changes: 1 addition & 1 deletion airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ from airflow.decorators.python_virtualenv import virtualenv_task
from airflow.decorators.sensor import sensor_task
from airflow.decorators.short_circuit import short_circuit_task
from airflow.decorators.task_group import task_group
from airflow.kubernetes.secret import Secret
from airflow.models.dag import dag
from airflow.providers.cncf.kubernetes.secret import Secret

# Please keep this in sync with __init__.py's __all__.
__all__ = [
Expand Down
5 changes: 2 additions & 3 deletions airflow/example_dags/example_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@

log = logging.getLogger(__name__)

worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

try:
from kubernetes.client import models as k8s
except ImportError:
Expand Down Expand Up @@ -163,6 +160,8 @@ def other_namespace_task():
print_stuff()

other_ns_task = other_namespace_task()
worker_container_repository = conf.get("kubernetes_executor", "worker_container_repository")
worker_container_tag = conf.get("kubernetes_executor", "worker_container_tag")

# You can also change the base image, here we used the worker image for demonstration.
# Note that the image must have the same configuration as the
Expand Down
Loading