diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index e305268a6b28e..1d458a8c362eb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2565,6 +2565,13 @@ kubernetes_executor: previous_name: kubernetes version: 2.5.0 options: + api_client_retry_configuration: + description: | + Kwargs to override the default urllib3 Retry used in the kubernetes API client + version_added: 2.6.0 + type: string + example: '{ "total": 3, "backoff_factor": 0.5 }' + default: "" pod_template_file: description: | Path to the YAML pod file that forms the basis for KubernetesExecutor workers. diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 7f440ec1995da..69393e014e6c5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -1282,6 +1282,10 @@ use_ssl = False verify_certs = True [kubernetes_executor] +# Kwargs to override the default urllib3 Retry used in the kubernetes API client +# Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor": 0.5 }} +api_client_retry_configuration = + # Path to the YAML pod file that forms the basis for KubernetesExecutor workers. pod_template_file = diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index 7e887ae1ac4bb..eb3912db3c7fa 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -19,6 +19,8 @@ import logging +import urllib3.util + from airflow.configuration import conf log = logging.getLogger(__name__) @@ -107,16 +109,27 @@ def get_kube_client( if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"): _enable_tcp_keepalive() + configuration = _get_default_configuration() + api_client_retry_configuration = conf.getjson("kubernetes", "api_client_retry_configuration", fallback={}) + + if not conf.getboolean("kubernetes_executor", "verify_ssl"): + _disable_verify_ssl() + + if isinstance(api_client_retry_configuration, dict): + configuration.retries = urllib3.util.Retry(**api_client_retry_configuration) + else: + raise ValueError("api_client_retry_configuration should be a dictionary") + if in_cluster: - config.load_incluster_config() + config.load_incluster_config(client_configuration=configuration) else: if cluster_context is None: cluster_context = conf.get("kubernetes_executor", "cluster_context", fallback=None) if config_file is None: config_file = conf.get("kubernetes_executor", "config_file", fallback=None) - config.load_kube_config(config_file=config_file, context=cluster_context) - - configuration = _get_default_configuration() + config.load_kube_config( + config_file=config_file, context=cluster_context, client_configuration=configuration + ) if not conf.getboolean("kubernetes_executor", "verify_ssl"): configuration.verify_ssl = False diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py index 26eda47952591..7157653e8c63f 100644 --- a/tests/kubernetes/test_client.py +++ b/tests/kubernetes/test_client.py @@ -23,6 +23,7 @@ from urllib3.connection import HTTPConnection, HTTPSConnection from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client +from tests.test_utils.config import conf_vars class TestClient: @@ -42,6 +43,7 @@ def test_load_file_config(self, config): @mock.patch("airflow.kubernetes.kube_client.conf") def test_load_config_disable_ssl(self, conf, config): conf.getboolean.return_value = False + conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5} client = get_kube_client(in_cluster=False) conf.getboolean.assert_called_with("kubernetes_executor", "verify_ssl") assert not client.api_client.configuration.verify_ssl @@ -50,6 +52,7 @@ def test_load_config_disable_ssl(self, conf, config): @mock.patch("airflow.kubernetes.kube_client.conf") def test_load_config_ssl_ca_cert(self, conf, config): conf.get.return_value = "/path/to/ca.crt" + conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5} client = get_kube_client(in_cluster=False) conf.get.assert_called_with("kubernetes_executor", "ssl_ca_cert") assert client.api_client.configuration.ssl_ca_cert == "/path/to/ca.crt" @@ -81,3 +84,11 @@ def test_disable_verify_ssl(self): else: configuration = Configuration() assert not configuration.verify_ssl + + @mock.patch("kubernetes.config.incluster_config.InClusterConfigLoader") + @conf_vars({("kubernetes", "api_client_retry_configuration"): '{"total": 3, "backoff_factor": 0.5}'}) + def test_api_client_retry_configuration_correct_values(self, mock_in_cluster_loader): + get_kube_client(in_cluster=True) + client_configuration = mock_in_cluster_loader().load_and_set.call_args[0][0] + assert client_configuration.retries.total == 3 + assert client_configuration.retries.backoff_factor == 0.5