Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add missing read for K8S config file from conn in deferred KubernetesPodOperator #29498

Merged
merged 7 commits into from
Apr 22, 2023
13 changes: 11 additions & 2 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from airflow.kubernetes import pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.models import BaseOperator
from airflow.models import BaseOperator, Connection
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import (
convert_affinity,
convert_configmap,
Expand Down Expand Up @@ -565,7 +565,16 @@ def execute_async(self, context: Context):

def convert_config_file_to_dict(self):
"""Converts passed config_file to dict format."""
config_file = self.config_file if self.config_file else os.environ.get(KUBE_CONFIG_ENV_VAR)
config_file = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @hussein-awala for proposing this fix.

why the async need the function convert_config_file_to_dictand not the sync ?

Look like the async was implemented not fully following this pattern -> #20578

your PR fix the problems for the extra config_path , there is a risk that another is missing or new in the future would need "manual" fix like this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the initial reason to convert the file into dictionary before creating the trigger, it may be to avoid copying the config file to the triggerer, where the pod is created on the worker using the sync hook and the waiting task is running on the triggerer and it uses the async hook.

here is a risk that another is missing or new in the future would need "manual" fix like this

With this fix, we cover all options currently available to provide the configuration file, and yes, if we add a new one in the future, we must add it on the sync hook and in this method.

@VladaZakharova can you please explain what was the motivation to convert the config file to a dictionary before creating the trigger?

Copy link
Contributor

@VladaZakharova VladaZakharova Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Team!
This was implemented to that config file was converted to dict to be passed to trigger and then hook to establish connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by lighten the credential management ?

the hook is not re instantiate at every run of the trigger ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needed a way to pass config file to the trigger to create a client for kubernetes, but using file system to communicate with trigger was not a good solution. So then we added a possibility to pass all config file parameters as a dict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To respect the pattern mentioned by @raphaelauv, I will try loading the config file in the async hook, this should work where the triggerer is initiated once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mind that all FS operations are blocking side effects. It's violating asyncio contract and can cause additional error logs informing about blocking code.

if self.config_file:
config_file = self.config_file
elif self.kubernetes_conn_id:
connection = Connection.get_connection_from_secrets(self.kubernetes_conn_id)
extra = connection.extra_dejson
if "extra__kubernetes__kube_config_path" in extra:
config_file = extra["extra__kubernetes__kube_config_path"]
if not config_file:
config_file = os.environ.get(KUBE_CONFIG_ENV_VAR)
if config_file:
with open(config_file) as f:
self._config_dict = yaml.safe_load(f)
Expand Down
55 changes: 53 additions & 2 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
from __future__ import annotations

import json
import os
import re
from contextlib import nullcontext
from unittest import mock
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, mock_open, patch

import pendulum
import pytest
Expand All @@ -30,7 +32,7 @@

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.kubernetes.secret import Secret
from airflow.models import DAG, DagModel, DagRun, TaskInstance
from airflow.models import DAG, Connection, DagModel, DagRun, TaskInstance
from airflow.models.xcom import XCom
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
Expand Down Expand Up @@ -1355,3 +1357,52 @@ def test_async_write_logs_should_execute_successfully(
post_complete_action.assert_called_once()
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

@pytest.mark.parametrize(
"create_conn, export_env_var, extra_kwarg, config_file_path, should_be_called",
[
(True, False, {"kubernetes_conn_id": "kubernetes_test_conf"}, "/path/to/config/file1", True),
(False, True, {}, "/path/to/config/file2", True),
(False, False, {"config_file": "/path/to/config/file3"}, "/path/to/config/file3", True),
(False, False, {}, None, False),
],
)
def test_load_config_file_from_conn(
self, create_conn, export_env_var, extra_kwarg, config_file_path, should_be_called
):
k = KubernetesPodOperator(
task_id=TEST_TASK_ID,
namespace=TEST_NAMESPACE,
image=TEST_IMAGE,
cmds=TEST_CMDS,
arguments=TEST_ARGS,
labels=TEST_LABELS,
name=TEST_NAME,
is_delete_operator_pod=False,
in_cluster=True,
get_logs=True,
deferrable=True,
**extra_kwarg,
)
if create_conn:
connection = Connection(
conn_id="kubernetes_test_conf",
conn_type="kubernetes",
extra=json.dumps({"extra__kubernetes__kube_config_path": config_file_path}),
)
with create_session() as session:
session.add(connection)
if export_env_var:
os.environ["KUBECONFIG"] = config_file_path
with patch("builtins.open", mock_open(read_data="{}")) as mock_file:
k.convert_config_file_to_dict()
if should_be_called:
mock_file.assert_called_with(config_file_path)
else:
mock_file.assert_not_called()

if create_conn:
with create_session() as session:
session.query(Connection).filter(Connection.conn_id == "kubernetes_test_conf").delete()
if export_env_var:
del os.environ["KUBECONFIG"]