Skip to content

Commit

Permalink
Adding configuration to control retry parameters for k8s api client (#…
Browse files Browse the repository at this point in the history
…29809)

* Adding configuration to control retry parameters for k8s api client

* Handling review comments

* Fixing code bug

* Fixing failing tests

* Temporary commit with UT wip

* Fixing unit test

* Fixing the strict checks

* Handling review comments from Hussein

* Revert "Handling review comments from Hussein"

This reverts commit fa3bc26.

* Fixing failing ut

* Reverting bad hack

* Updating logic in kube_client.py

Co-authored-by: Hussein Awala <[email protected]>

* Fixing unit tests

* Fixing unit tests

* Handling review comments from Ash

* Fix loading mock call args for python3.7

* Apply suggestions from code review

* fix static check

* add in 2.6.0

---------

Co-authored-by: Amogh <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
(cherry picked from commit dcffbb4)
  • Loading branch information
amoghrajesh authored and ephraimbuddy committed Apr 14, 2023
1 parent 4fac945 commit eccac28
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2565,6 +2565,13 @@ kubernetes_executor:
previous_name: kubernetes
version: 2.5.0
options:
api_client_retry_configuration:
description: |
Kwargs to override the default urllib3 Retry used in the kubernetes API client
version_added: 2.6.0
type: string
example: '{ "total": 3, "backoff_factor": 0.5 }'
default: ""
pod_template_file:
description: |
Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,10 @@ use_ssl = False
verify_certs = True

[kubernetes_executor]
# Kwargs to override the default urllib3 Retry used in the kubernetes API client
# Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor": 0.5 }}
api_client_retry_configuration =

# Path to the YAML pod file that forms the basis for KubernetesExecutor workers.
pod_template_file =

Expand Down
21 changes: 17 additions & 4 deletions airflow/kubernetes/kube_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import logging

import urllib3.util

from airflow.configuration import conf

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,16 +109,27 @@ def get_kube_client(
if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"):
_enable_tcp_keepalive()

configuration = _get_default_configuration()
api_client_retry_configuration = conf.getjson("kubernetes", "api_client_retry_configuration", fallback={})

if not conf.getboolean("kubernetes_executor", "verify_ssl"):
_disable_verify_ssl()

if isinstance(api_client_retry_configuration, dict):
configuration.retries = urllib3.util.Retry(**api_client_retry_configuration)
else:
raise ValueError("api_client_retry_configuration should be a dictionary")

if in_cluster:
config.load_incluster_config()
config.load_incluster_config(client_configuration=configuration)
else:
if cluster_context is None:
cluster_context = conf.get("kubernetes_executor", "cluster_context", fallback=None)
if config_file is None:
config_file = conf.get("kubernetes_executor", "config_file", fallback=None)
config.load_kube_config(config_file=config_file, context=cluster_context)

configuration = _get_default_configuration()
config.load_kube_config(
config_file=config_file, context=cluster_context, client_configuration=configuration
)

if not conf.getboolean("kubernetes_executor", "verify_ssl"):
configuration.verify_ssl = False
Expand Down
11 changes: 11 additions & 0 deletions tests/kubernetes/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from urllib3.connection import HTTPConnection, HTTPSConnection

from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
from tests.test_utils.config import conf_vars


class TestClient:
Expand All @@ -42,6 +43,7 @@ def test_load_file_config(self, config):
@mock.patch("airflow.kubernetes.kube_client.conf")
def test_load_config_disable_ssl(self, conf, config):
conf.getboolean.return_value = False
conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
client = get_kube_client(in_cluster=False)
conf.getboolean.assert_called_with("kubernetes_executor", "verify_ssl")
assert not client.api_client.configuration.verify_ssl
Expand All @@ -50,6 +52,7 @@ def test_load_config_disable_ssl(self, conf, config):
@mock.patch("airflow.kubernetes.kube_client.conf")
def test_load_config_ssl_ca_cert(self, conf, config):
conf.get.return_value = "/path/to/ca.crt"
conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
client = get_kube_client(in_cluster=False)
conf.get.assert_called_with("kubernetes_executor", "ssl_ca_cert")
assert client.api_client.configuration.ssl_ca_cert == "/path/to/ca.crt"
Expand Down Expand Up @@ -81,3 +84,11 @@ def test_disable_verify_ssl(self):
else:
configuration = Configuration()
assert not configuration.verify_ssl

@mock.patch("kubernetes.config.incluster_config.InClusterConfigLoader")
@conf_vars({("kubernetes", "api_client_retry_configuration"): '{"total": 3, "backoff_factor": 0.5}'})
def test_api_client_retry_configuration_correct_values(self, mock_in_cluster_loader):
get_kube_client(in_cluster=True)
client_configuration = mock_in_cluster_loader().load_and_set.call_args[0][0]
assert client_configuration.retries.total == 3
assert client_configuration.retries.backoff_factor == 0.5

0 comments on commit eccac28

Please sign in to comment.