From a85b839f5c6bd91341ec0301135aa948f3fcea63 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Mon, 7 Nov 2022 11:46:23 -0600 Subject: [PATCH 01/14] init pods --- docs/jobs.md | 2 +- docs/pods.md | 0 mkdocs.yml | 1 + prefect_kubernetes/jobs.py | 2 +- prefect_kubernetes/pods.py | 131 +++++++++++++++++++++++++++++++++++++ tests/test_pods.py | 15 +++++ 6 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 docs/pods.md create mode 100644 prefect_kubernetes/pods.py create mode 100644 tests/test_pods.py diff --git a/docs/jobs.md b/docs/jobs.md index a8728be..bce587b 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -1 +1 @@ -:::prefect_kubernetes.jobs \ No newline at end of file +::: prefect_kubernetes.jobs \ No newline at end of file diff --git a/docs/pods.md b/docs/pods.md new file mode 100644 index 0000000..e69de29 diff --git a/mkdocs.yml b/mkdocs.yml index 80e9529..33cb620 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -39,3 +39,4 @@ nav: - Credentials: credentials.md - Exceptions: exceptions.md - Jobs: jobs.md + - Pods: pods.md diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index f994290..102a661 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -58,7 +58,7 @@ async def delete_namespaced_job( kubernetes_credentials: KubernetesCredentials, namespace: Optional[str] = "default", kube_kwargs: Optional[Dict[str, Any]] = None, - delete_option_kwargs: Optional[Dict] = None, + delete_option_kwargs: Optional[Dict[str, Any]] = None, ) -> V1Status: """Task for deleting a namespaced Kubernetes job. diff --git a/prefect_kubernetes/pods.py b/prefect_kubernetes/pods.py new file mode 100644 index 0000000..bcb1775 --- /dev/null +++ b/prefect_kubernetes/pods.py @@ -0,0 +1,131 @@ +"""Module for interacting with Kubernetes pods from Prefect flows.""" +from typing import Any, Dict, Optional + +from kubernetes.client.models import V1Pod, V1PodList +from prefect import task +from prefect.utilities.asyncutils import run_sync_in_worker_thread + +from prefect_kubernetes.credentials import KubernetesCredentials + + +@task +async def create_namespaced_pod( + body: Dict, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, +) -> V1Pod: + """Create a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.create_namespaced_pod, namespace=namespace, body=body, **kube_kwargs + ) + + +@task +async def delete_namespaced_pod( + pod_name: str, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, + delete_option_kwargs: Optional[Dict[str, Any]] = None, +) -> V1Pod: + """Delete a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + kube_kwargs = kube_kwargs or {} + + if delete_option_kwargs: + kube_kwargs.update(body=client.V1DeleteOptions(**delete_option_kwargs)) + + return await run_sync_in_worker_thread( + client.delete_namespaced_pod, pod_name, namespace=namespace, **kube_kwargs + ) + + +@task +async def list_namespaced_pod( + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, +) -> V1PodList: + """List all pods in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.list_namespaced_pod, namespace=namespace, **kube_kwargs + ) + + +@task +async def patch_namespaced_pod( + pod_name: str, + body: Dict, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, +) -> V1Pod: + """Patch a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.patch_namespaced_pod, + name=pod_name, + namespace=namespace, + body=body, + **kube_kwargs + ) + + +@task +async def read_namespaced_pod( + pod_name: str, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, +) -> V1Pod: + """Read information on a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.read_namespaced_pod, name=pod_name, namespace=namespace, **kube_kwargs + ) + + +@task +async def read_namespaced_pod_logs( + pod_name: str, + container: str, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", +) -> str: + """Read logs from a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + container=container, + ) + + +@task +async def replace_namespaced_pod( + pod_name: str, + body: Dict, + kubernetes_credentials: KubernetesCredentials, + namespace: Optional[str] = "default", + kube_kwargs: Optional[Dict[str, Any]] = None, +) -> V1Pod: + """Replace a Kubernetes pod in a given namespace.""" + client = kubernetes_credentials.get_core_client() + + return await run_sync_in_worker_thread( + client.replace_namespaced_pod, + body=body, + name=pod_name, + namespace=namespace, + **kube_kwargs + ) diff --git a/tests/test_pods.py b/tests/test_pods.py new file mode 100644 index 0000000..401aff8 --- /dev/null +++ b/tests/test_pods.py @@ -0,0 +1,15 @@ +import pytest + +from prefect_kubernetes.pods import ( + create_namespaced_pod, + delete_namespaced_pod, + list_namespaced_pod, + patch_namespaced_pod, + read_namespaced_pod, + read_namespaced_pod_logs, + replace_namespaced_pod, +) + + +async def test_create_namespaced_pod(_mock_api_core_client, kubernetes_credentials): + await create_namespaced_pod.fn() From fb00ef01abe9b42bf71480edc099e316dc7d950a Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 9 Nov 2022 15:11:06 -0600 Subject: [PATCH 02/14] add pod tests --- tests/test_pods.py | 115 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 113 insertions(+), 2 deletions(-) diff --git a/tests/test_pods.py b/tests/test_pods.py index 401aff8..f6b61e4 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -1,4 +1,5 @@ import pytest +from kubernetes.client.exceptions import ApiValueError from prefect_kubernetes.pods import ( create_namespaced_pod, @@ -11,5 +12,115 @@ ) -async def test_create_namespaced_pod(_mock_api_core_client, kubernetes_credentials): - await create_namespaced_pod.fn() +async def test_invalid_body_raises_error(kubernetes_credentials): + with pytest.raises(ApiValueError): + await create_namespaced_pod.fn( + body=None, kubernetes_credentials=kubernetes_credentials + ) + with pytest.raises(ApiValueError): + await patch_namespaced_pod.fn( + body=None, pod_name="", kubernetes_credentials=kubernetes_credentials + ) + + +async def test_create_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await create_namespaced_pod.fn( + body={"test": "a"}, + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + + assert _mock_api_core_client.create_namespaced_pod.call_args[1]["body"] == { + "test": "a" + } + assert _mock_api_core_client.create_namespaced_pod.call_args[1]["a"] == "test" + + +async def test_delete_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await delete_namespaced_pod.fn( + pod_name="test_pod", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert ( + _mock_api_core_client.delete_namespaced_pod.call_args[1]["namespace"] + == "default" + ) + assert _mock_api_core_client.delete_namespaced_pod.call_args[1]["a"] == "test" + + +async def test_list_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await list_namespaced_pod.fn( + namespace="ns", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert _mock_api_core_client.list_namespaced_pod.call_args[1]["namespace"] == "ns" + assert _mock_api_core_client.list_namespaced_pod.call_args[1]["a"] == "test" + + +async def test_patch_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await patch_namespaced_pod.fn( + body={"test": "a"}, + pod_name="test_pod", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["body"] == { + "test": "a" + } + assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["name"] == "test_pod" + assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["a"] == "test" + + +async def test_read_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await read_namespaced_pod.fn( + pod_name="test_pod", + namespace="ns", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert _mock_api_core_client.read_namespaced_pod.call_args[1]["name"] == "test_pod" + assert _mock_api_core_client.read_namespaced_pod.call_args[1]["namespace"] == "ns" + assert _mock_api_core_client.read_namespaced_pod.call_args[1]["a"] == "test" + + +async def test_read_namespaced_pod_logs(kubernetes_credentials, _mock_api_core_client): + await read_namespaced_pod_logs.fn( + pod_name="test_pod", + container="test_container", + namespace="ns", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["name"] == "test_pod" + ) + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["namespace"] == "ns" + ) + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["container"] + == "test_container" + ) + assert _mock_api_core_client.read_namespaced_pod_log.call_args[1]["a"] == "test" + + +async def test_replace_namespaced_pod(kubernetes_credentials, _mock_api_core_client): + await replace_namespaced_pod.fn( + pod_name="test_pod", + body={"test": "a"}, + namespace="ns", + a="test", + kubernetes_credentials=kubernetes_credentials, + ) + assert ( + _mock_api_core_client.replace_namespaced_pod.call_args[1]["name"] == "test_pod" + ) + assert ( + _mock_api_core_client.replace_namespaced_pod.call_args[1]["namespace"] == "ns" + ) + assert _mock_api_core_client.replace_namespaced_pod.call_args[1]["body"] == { + "test": "a" + } + assert _mock_api_core_client.replace_namespaced_pod.call_args[1]["a"] == "test" From 1d7f899680ff1965f90aa7557ce6be36d977ea8d Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 20:58:30 -0600 Subject: [PATCH 03/14] rework client generation via credentials block, add more specific names --- prefect_kubernetes/credentials.py | 79 +++++++++++-------- prefect_kubernetes/jobs.py | 24 +++--- prefect_kubernetes/pods.py | 127 +++++++++++++++++++----------- tests/conftest.py | 18 ++--- tests/test_credentials.py | 19 +++-- 5 files changed, 158 insertions(+), 109 deletions(-) diff --git a/prefect_kubernetes/credentials.py b/prefect_kubernetes/credentials.py index 96f287b..510656b 100644 --- a/prefect_kubernetes/credentials.py +++ b/prefect_kubernetes/credentials.py @@ -1,13 +1,22 @@ """Module for defining Kubernetes credential handling and client generation.""" from contextlib import contextmanager -from typing import Generator, Optional +from typing import Generator, Optional, Union from kubernetes import config as kube_config -from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, CoreV1Api +from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, Configuration, CoreV1Api from kubernetes.config.config_exception import ConfigException from prefect.blocks.core import Block from prefect.blocks.kubernetes import KubernetesClusterConfig +from prefect.utilities.collections import listrepr + +KubernetesClient = Union[AppsV1Api, BatchV1Api, CoreV1Api] + +K8S_CLIENT_TYPES = { + "apps": AppsV1Api, + "batch": BatchV1Api, + "core": CoreV1Api, +} class KubernetesCredentials(Block): @@ -49,7 +58,7 @@ class KubernetesCredentials(Block): @flow def kubernetes_orchestrator(): create_namespaced_job( - body=V1Job(**{"Marvin": "42"}), + body=V1Job(**{"metadata": {"name": "my-job"}}), kubernetes_credentials=kubernetes_credentials ) ``` @@ -61,46 +70,40 @@ def kubernetes_orchestrator(): cluster_config: Optional[KubernetesClusterConfig] = None @contextmanager - def get_app_client(self) -> Generator[AppsV1Api, None, None]: + def get_client( + self, + client_type: str, + configuration: Optional[Configuration] = None, + ) -> Generator[KubernetesClient, None, None]: """Convenience method for retrieving a Kubernetes API client for deployment resources. - Yields: - Kubernetes API client to interact with "deployment" resources. - """ - with self.get_generic_client() as generic_client: - try: - yield AppsV1Api(api_client=generic_client) - finally: - generic_client.rest_client.pool_manager.clear() - - @contextmanager - def get_batch_client(self) -> Generator[BatchV1Api, None, None]: - """Convenience method for retrieving a Kubernetes API client for job resources. + Args: + client_type: The resource-specific type of Kubernetes client to retrieve. Yields: - Kubernetes API client to interact with "job" resources. - """ - with self.get_generic_client() as generic_client: - try: - yield BatchV1Api(api_client=generic_client) - finally: - generic_client.rest_client.pool_manager.clear() + An authenticated, resource-specific Kubernetes API client. - @contextmanager - def get_core_client(self) -> Generator[CoreV1Api, None, None]: - """Convenience method for retrieving a Kubernetes API client for core resources. + Example: + ```python + from prefect_kubernetes.credentials import KubernetesCredentials - Yields: - Kubernetes API client to interact with "pod", "service" - and "secret" resources. + with KubernetesCredentials.get_client("core") as core_v1_client: + for pod in core_v1_client.list_namespaced_pod(): + print(pod.metadata.name) + ``` """ - with self.get_generic_client() as generic_client: + client_config = configuration or Configuration() + + with ApiClient(configuration=client_config) as generic_client: try: - yield CoreV1Api(api_client=generic_client) + yield self.get_resource_specific_client(client_type) finally: generic_client.rest_client.pool_manager.clear() - def get_generic_client(self) -> ApiClient: + def get_resource_specific_client( + self, + client_type: str, + ) -> KubernetesClient: """ Utility function for configuring a generic Kubernetes client. It will attempt to connect to a Kubernetes cluster in three steps with @@ -116,6 +119,9 @@ def get_generic_client(self) -> ApiClient: 3. Attempt out-of-cluster connection using the default location for a kube config file. + Args: + resource: The Kubernetes resource to configure a client for. + Returns: An authenticated, generic Kubernetes Client. """ @@ -128,5 +134,10 @@ def get_generic_client(self) -> ApiClient: except ConfigException: kube_config.load_kube_config() - client = ApiClient() - return client + try: + return K8S_CLIENT_TYPES[client_type]() + except KeyError: + raise ValueError( + f"Invalid client type provided '{client_type}'." + f" Must be one of {listrepr(K8S_CLIENT_TYPES.keys())}." + ) diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index 7287eaf..0e9a5ed 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -45,10 +45,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.create_namespaced_job, + batch_v1_client.create_namespaced_job, namespace=namespace, body=body, **kube_kwargs, @@ -96,10 +96,10 @@ def kubernetes_orchestrator(): ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.delete_namespaced_job, + batch_v1_client.delete_namespaced_job, name=job_name, body=body, namespace=namespace, @@ -140,10 +140,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.list_namespaced_job, + batch_v1_client.list_namespaced_job, namespace=namespace, **kube_kwargs, ) @@ -193,10 +193,10 @@ def kubernetes_orchestrator(): ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.patch_namespaced_job, + batch_v1_client.patch_namespaced_job, name=job_name, namespace=namespace, body=body, @@ -242,10 +242,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.read_namespaced_job, + batch_v1_client.read_namespaced_job, name=job_name, namespace=namespace, **kube_kwargs, @@ -290,10 +290,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_batch_client() as api_client: + with kubernetes_credentials.get_client("batch") as batch_v1_client: return await run_sync_in_worker_thread( - api_client.replace_namespaced_job, + batch_v1_client.replace_namespaced_job, name=job_name, body=body, namespace=namespace, diff --git a/prefect_kubernetes/pods.py b/prefect_kubernetes/pods.py index 2cf6191..7539496 100644 --- a/prefect_kubernetes/pods.py +++ b/prefect_kubernetes/pods.py @@ -1,7 +1,9 @@ """Module for interacting with Kubernetes pods from Prefect flows.""" -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional, Union +from kubernetes.client.exceptions import ApiException from kubernetes.client.models import V1DeleteOptions, V1Pod, V1PodList +from kubernetes.watch import Watch from prefect import task from prefect.utilities.asyncutils import run_sync_in_worker_thread @@ -11,12 +13,12 @@ @task async def create_namespaced_pod( kubernetes_credentials: KubernetesCredentials, - body: Dict, + body: V1Pod, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Pod: """Create a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. @@ -34,7 +36,7 @@ async def create_namespaced_pod( from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import create_namespaced_pod from kubernetes.client.models import V1Pod - + @flow def kubernetes_orchestrator(): v1_pod_metadata = create_namespaced_pod( @@ -43,10 +45,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.create_namespaced_pod, + core_v1_client.create_namespaced_pod, namespace=namespace, body=body, **kube_kwargs, @@ -62,7 +64,7 @@ async def delete_namespaced_pod( **kube_kwargs: Dict[str, Any], ) -> V1Pod: """Delete a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. @@ -70,10 +72,10 @@ async def delete_namespaced_pod( body: A Kubernetes `V1DeleteOptions` object. namespace: The Kubernetes namespace to delete this pod from. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A Kubernetes `V1Pod` object. - + Example: Delete a pod in the default namespace: ```python @@ -81,7 +83,7 @@ async def delete_namespaced_pod( from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import delete_namespaced_pod from kubernetes.client.models import V1DeleteOptions - + @flow def kubernetes_orchestrator(): v1_pod_metadata = delete_namespaced_pod( @@ -91,10 +93,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.delete_namespaced_pod, + core_v1_client.delete_namespaced_pod, pod_name, body=body, namespace=namespace, @@ -109,23 +111,23 @@ async def list_namespaced_pod( **kube_kwargs: Dict[str, Any], ) -> V1PodList: """List all pods in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. namespace: The Kubernetes namespace to list pods from. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A Kubernetes `V1PodList` object. - + Example: List all pods in the default namespace: ```python from prefect import flow from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import list_namespaced_pod - + @flow def kubernetes_orchestrator(): v1_pod_list = list_namespaced_pod( @@ -133,10 +135,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.list_namespaced_pod, namespace=namespace, **kube_kwargs + core_v1_client.list_namespaced_pod, namespace=namespace, **kube_kwargs ) @@ -149,7 +151,7 @@ async def patch_namespaced_pod( **kube_kwargs: Dict[str, Any], ) -> V1Pod: """Patch a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. @@ -157,10 +159,10 @@ async def patch_namespaced_pod( body: A Kubernetes `V1Pod` object. namespace: The Kubernetes namespace to patch this pod in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A Kubernetes `V1Pod` object. - + Example: Patch a pod in the default namespace: ```python @@ -168,7 +170,7 @@ async def patch_namespaced_pod( from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import patch_namespaced_pod from kubernetes.client.models import V1Pod - + @flow def kubernetes_orchestrator(): v1_pod_metadata = patch_namespaced_pod( @@ -178,10 +180,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.patch_namespaced_pod, + core_v1_client.patch_namespaced_pod, name=pod_name, namespace=namespace, body=body, @@ -197,23 +199,23 @@ async def read_namespaced_pod( **kube_kwargs: Dict[str, Any], ) -> V1Pod: """Read information on a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. pod_name: The name of the pod to read. namespace: The Kubernetes namespace to read this pod from. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A Kubernetes `V1Pod` object. - + Example: Read a pod in the default namespace: ```python from prefect import flow from prefect_kubernetes.credentials import KubernetesCredentials - + @flow def kubernetes_orchestrator(): v1_pod_metadata = read_namespaced_pod( @@ -222,10 +224,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.read_namespaced_pod, + core_v1_client.read_namespaced_pod, name=pod_name, namespace=namespace, **kube_kwargs, @@ -238,47 +240,78 @@ async def read_namespaced_pod_logs( pod_name: str, container: str, namespace: Optional[str] = "default", + print_func: Optional[Callable] = None, **kube_kwargs: Dict[str, Any], -) -> str: +) -> Union[str, None]: """Read logs from a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. pod_name: The name of the pod to read logs from. container: The name of the container to read logs from. namespace: The Kubernetes namespace to read this pod from. + print_func: If provided, it will stream the pod logs by calling `print_func` + for every line and returning `None`. If not provided, the current pod + logs will be returned immediately. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A string containing the logs from the pod's container. - + Example: Read logs from a pod in the default namespace: ```python - from prefect import flow + from prefect import flow, get_run_logger from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import read_namespaced_pod_logs - + @flow def kubernetes_orchestrator(): + logger = get_run_logger() + pod_logs = read_namespaced_pod_logs( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), pod_name="test-pod", - container="test-container" + container="test-container", + print_func=logger.info ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: - return await run_sync_in_worker_thread( - api_client.read_namespaced_pod_log, + if print_func is None: + return await run_sync_in_worker_thread( + core_v1_client.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + container=container, + **kube_kwargs, + ) + + # From the kubernetes.watch documentation: + # Note that watching an API resource can expire. The method tries to + # resume automatically once from the last result, but if that last result + # is too old as well, an `ApiException` exception will be thrown with + # ``code`` 410. + + read_logs_coroutine = run_sync_in_worker_thread( + core_v1_client.read_namespaced_pod_log, name=pod_name, namespace=namespace, container=container, - **kube_kwargs, ) + while True: + try: + for log_line in Watch().stream(await read_logs_coroutine): + print_func(log_line) + return + + except ApiException as e: + if e.status != 410: + raise + @task async def replace_namespaced_pod( @@ -289,7 +322,7 @@ async def replace_namespaced_pod( **kube_kwargs: Dict[str, Any], ) -> V1Pod: """Replace a Kubernetes pod in a given namespace. - + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. @@ -297,10 +330,10 @@ async def replace_namespaced_pod( body: A Kubernetes `V1Pod` object. namespace: The Kubernetes namespace to replace this pod in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. - + Returns: A Kubernetes `V1Pod` object. - + Example: Replace a pod in the default namespace: ```python @@ -308,7 +341,7 @@ async def replace_namespaced_pod( from prefect_kubernetes.credentials import KubernetesCredentials from prefect_kubernetes.pods import replace_namespaced_pod from kubernetes.client.models import V1Pod - + @flow def kubernetes_orchestrator(): v1_pod_metadata = replace_namespaced_pod( @@ -318,10 +351,10 @@ def kubernetes_orchestrator(): ) ``` """ - with kubernetes_credentials.get_core_client() as api_client: + with kubernetes_credentials.get_client("core") as core_v1_client: return await run_sync_in_worker_thread( - api_client.replace_namespaced_pod, + core_v1_client.replace_namespaced_pod, body=body, name=pod_name, namespace=namespace, diff --git a/tests/conftest.py b/tests/conftest.py index 7fcad96..e03d3b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,12 +41,12 @@ def _mock_api_app_client(monkeypatch): app_client = MagicMock(spec=AppsV1Api) @contextmanager - def get_app_client(_): + def get_client(self, _): yield app_client monkeypatch.setattr( - "prefect_kubernetes.credentials.KubernetesCredentials.get_app_client", - get_app_client, + "prefect_kubernetes.credentials.KubernetesCredentials.get_client", + get_client, ) return app_client @@ -57,12 +57,12 @@ def _mock_api_batch_client(monkeypatch): batch_client = MagicMock(spec=BatchV1Api) @contextmanager - def get_batch_client(_): + def get_client(self, _): yield batch_client monkeypatch.setattr( - "prefect_kubernetes.credentials.KubernetesCredentials.get_batch_client", - get_batch_client, + "prefect_kubernetes.credentials.KubernetesCredentials.get_client", + get_client, ) return batch_client @@ -73,12 +73,12 @@ def _mock_api_core_client(monkeypatch): core_client = MagicMock(spec=CoreV1Api) @contextmanager - def get_core_client(_): + def get_client(self, _): yield core_client monkeypatch.setattr( - "prefect_kubernetes.credentials.KubernetesCredentials.get_core_client", - get_core_client, + "prefect_kubernetes.credentials.KubernetesCredentials.get_client", + get_client, ) return core_client diff --git a/tests/test_credentials.py b/tests/test_credentials.py index 17fdcb6..20cc5ce 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -3,15 +3,20 @@ @pytest.mark.parametrize( - "resource_type_method,client_type", + "resource_type,client_type", [ - ("get_app_client", AppsV1Api), - ("get_batch_client", BatchV1Api), - ("get_core_client", CoreV1Api), + ("apps", AppsV1Api), + ("batch", BatchV1Api), + ("core", CoreV1Api), ], ) -def test_client_return_type(kubernetes_credentials, resource_type_method, client_type): - resource_specific_client = getattr(kubernetes_credentials, resource_type_method) +def test_client_return_type(kubernetes_credentials, resource_type, client_type): - with resource_specific_client() as client: + with kubernetes_credentials.get_client(resource_type) as client: assert isinstance(client, client_type) + + +def test_client_bad_resource_type(kubernetes_credentials): + with pytest.raises(ValueError): + with kubernetes_credentials.get_client("shoo-ba-daba-doo"): + pass From d859fc02463845e9925870f30dabc6c8923606c0 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 21:28:58 -0600 Subject: [PATCH 04/14] add tests verifying model inputs --- tests/test_jobs.py | 47 ++++++++++++++++++++++------------------ tests/test_pods.py | 53 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 67 insertions(+), 33 deletions(-) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 71515e6..7909e4b 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -1,5 +1,6 @@ import pytest from kubernetes.client.exceptions import ApiValueError +from kubernetes.client.models import V1Job from prefect_kubernetes.jobs import ( create_namespaced_job, @@ -11,7 +12,7 @@ ) -async def test_invalid_body_raises_error(kubernetes_credentials): +async def test_null_body_raises_error(kubernetes_credentials): with pytest.raises(ApiValueError): await create_namespaced_job.fn( body=None, kubernetes_credentials=kubernetes_credentials @@ -20,29 +21,33 @@ async def test_invalid_body_raises_error(kubernetes_credentials): await patch_namespaced_job.fn( body=None, job_name="", kubernetes_credentials=kubernetes_credentials ) + with pytest.raises(ApiValueError): + await replace_namespaced_job.fn( + body=None, job_name="", kubernetes_credentials=kubernetes_credentials + ) async def test_create_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await create_namespaced_job.fn( - body={"test": "a"}, + body=V1Job(**{"metadata": {"name": "test-job"}}), a="test", kubernetes_credentials=kubernetes_credentials, ) - assert _mock_api_batch_client.create_namespaced_job.call_args[1]["body"] == { - "test": "a" - } + assert _mock_api_batch_client.create_namespaced_job.call_args[1][ + "body" + ].metadata == {"name": "test-job"} assert _mock_api_batch_client.create_namespaced_job.call_args[1]["a"] == "test" async def test_delete_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await delete_namespaced_job.fn( - job_name="test_job", + job_name="test-job", a="test", kubernetes_credentials=kubernetes_credentials, ) assert ( - _mock_api_batch_client.delete_namespaced_job.call_args[1]["name"] == "test_job" + _mock_api_batch_client.delete_namespaced_job.call_args[1]["name"] == "test-job" ) assert _mock_api_batch_client.delete_namespaced_job.call_args[1]["a"] == "test" @@ -59,47 +64,47 @@ async def test_list_namespaced_job(kubernetes_credentials, _mock_api_batch_clien async def test_patch_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await patch_namespaced_job.fn( - body={"test": "a"}, - job_name="test_job", + body=V1Job(**{"metadata": {"name": "test-job"}}), + job_name="test-job", a="test", kubernetes_credentials=kubernetes_credentials, ) - assert _mock_api_batch_client.patch_namespaced_job.call_args[1]["body"] == { - "test": "a" - } + assert _mock_api_batch_client.patch_namespaced_job.call_args[1][ + "body" + ].metadata == {"name": "test-job"} assert ( - _mock_api_batch_client.patch_namespaced_job.call_args[1]["name"] == "test_job" + _mock_api_batch_client.patch_namespaced_job.call_args[1]["name"] == "test-job" ) assert _mock_api_batch_client.patch_namespaced_job.call_args[1]["a"] == "test" async def test_read_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await read_namespaced_job.fn( - job_name="test_job", + job_name="test-job", namespace="ns", a="test", kubernetes_credentials=kubernetes_credentials, ) - assert _mock_api_batch_client.read_namespaced_job.call_args[1]["name"] == "test_job" + assert _mock_api_batch_client.read_namespaced_job.call_args[1]["name"] == "test-job" assert _mock_api_batch_client.read_namespaced_job.call_args[1]["namespace"] == "ns" assert _mock_api_batch_client.read_namespaced_job.call_args[1]["a"] == "test" async def test_replace_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await replace_namespaced_job.fn( - job_name="test_job", - body={"test": "a"}, + job_name="test-job", + body=V1Job(**{"metadata": {"name": "test-job"}}), namespace="ns", a="test", kubernetes_credentials=kubernetes_credentials, ) assert ( - _mock_api_batch_client.replace_namespaced_job.call_args[1]["name"] == "test_job" + _mock_api_batch_client.replace_namespaced_job.call_args[1]["name"] == "test-job" ) assert ( _mock_api_batch_client.replace_namespaced_job.call_args[1]["namespace"] == "ns" ) - assert _mock_api_batch_client.replace_namespaced_job.call_args[1]["body"] == { - "test": "a" - } + assert _mock_api_batch_client.replace_namespaced_job.call_args[1][ + "body" + ].metadata == {"name": "test-job"} assert _mock_api_batch_client.replace_namespaced_job.call_args[1]["a"] == "test" diff --git a/tests/test_pods.py b/tests/test_pods.py index f6b61e4..74bb95e 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -1,5 +1,6 @@ import pytest from kubernetes.client.exceptions import ApiValueError +from kubernetes.client.models import V1DeleteOptions, V1Pod from prefect_kubernetes.pods import ( create_namespaced_pod, @@ -25,28 +26,44 @@ async def test_invalid_body_raises_error(kubernetes_credentials): async def test_create_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await create_namespaced_pod.fn( - body={"test": "a"}, + body=V1Pod(**{"metadata": {"name": "test-pod"}}), a="test", kubernetes_credentials=kubernetes_credentials, ) - assert _mock_api_core_client.create_namespaced_pod.call_args[1]["body"] == { - "test": "a" - } + assert _mock_api_core_client.create_namespaced_pod.call_args[1][ + "body" + ].metadata == {"name": "test-pod"} assert _mock_api_core_client.create_namespaced_pod.call_args[1]["a"] == "test" async def test_delete_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await delete_namespaced_pod.fn( + kubernetes_credentials=kubernetes_credentials, pod_name="test_pod", + body=V1DeleteOptions(grace_period_seconds=42), a="test", - kubernetes_credentials=kubernetes_credentials, ) assert ( _mock_api_core_client.delete_namespaced_pod.call_args[1]["namespace"] == "default" ) assert _mock_api_core_client.delete_namespaced_pod.call_args[1]["a"] == "test" + assert ( + _mock_api_core_client.delete_namespaced_pod.call_args[1][ + "body" + ].grace_period_seconds + == 42 + ) + + +async def test_bad_v1_delete_options(kubernetes_credentials, _mock_api_core_client): + with pytest.raises(TypeError): + await delete_namespaced_pod.fn( + kubernetes_credentials=kubernetes_credentials, + pod_name="test_pod", + body=V1DeleteOptions(skrrrt_skrrrt="yeehaw"), + ) async def test_list_namespaced_pod(kubernetes_credentials, _mock_api_core_client): @@ -61,13 +78,13 @@ async def test_list_namespaced_pod(kubernetes_credentials, _mock_api_core_client async def test_patch_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await patch_namespaced_pod.fn( - body={"test": "a"}, + body=V1Pod(**{"metadata": {"name": "test-pod"}}), pod_name="test_pod", a="test", kubernetes_credentials=kubernetes_credentials, ) - assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["body"] == { - "test": "a" + assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["body"].metadata == { + "name": "test-pod" } assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["name"] == "test_pod" assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["a"] == "test" @@ -109,7 +126,7 @@ async def test_read_namespaced_pod_logs(kubernetes_credentials, _mock_api_core_c async def test_replace_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await replace_namespaced_pod.fn( pod_name="test_pod", - body={"test": "a"}, + body=V1Pod(**{"metadata": {"name": "test-pod"}}), namespace="ns", a="test", kubernetes_credentials=kubernetes_credentials, @@ -120,7 +137,19 @@ async def test_replace_namespaced_pod(kubernetes_credentials, _mock_api_core_cli assert ( _mock_api_core_client.replace_namespaced_pod.call_args[1]["namespace"] == "ns" ) - assert _mock_api_core_client.replace_namespaced_pod.call_args[1]["body"] == { - "test": "a" - } + assert _mock_api_core_client.replace_namespaced_pod.call_args[1][ + "body" + ].metadata == {"name": "test-pod"} assert _mock_api_core_client.replace_namespaced_pod.call_args[1]["a"] == "test" + + +@pytest.mark.parametrize( + "task_accepting_pod", + [create_namespaced_pod, patch_namespaced_pod, replace_namespaced_pod], +) +async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod): + with pytest.raises(TypeError): + await task_accepting_pod.fn( + body=V1Pod(**{"random_not_real": "shabba-ranks"}), + kubernetes_credentials=kubernetes_credentials, + ) From b39cb442804089241178a894a2bd95f6393cb7a1 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 21:32:54 -0600 Subject: [PATCH 05/14] update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d454b7..357f8bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) - Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) +- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) ### Changed +- `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) + ### Deprecated From 784523afc07522e96461c2b6f39c53c68f019db6 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 21:41:35 -0600 Subject: [PATCH 06/14] whoops forgot to populate pods.md --- docs/pods.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/pods.md b/docs/pods.md index e69de29..08edc9c 100644 --- a/docs/pods.md +++ b/docs/pods.md @@ -0,0 +1 @@ +::: prefect_kubernetes.pods \ No newline at end of file From d4f52f04dc852a550a013ad1b8a5e8ccadb02773 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 21:52:20 -0600 Subject: [PATCH 07/14] update examples in readme --- README.md | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 314b476..d1984a5 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Welcome! -Prefect integrations for interacting with prefect-kubernetes. +Prefect integrations for interacting with Kubernetes resources. ## Getting Started @@ -23,6 +23,21 @@ pip install prefect-kubernetes ``` ### Write and run a flow +#### Generate a resource-specific client from `KubernetesClusterConfig` + +```python +from prefect.blocks.kubernetes import KubernetesClusterConfig +from prefect_kubernetes.credentials import KubernetesCredentials + +k8s_config = KubernetesClusterConfig.from_file('~/.kube/config') + +k8s_credentials = KubernetesCredentials(cluster_config=k8s_config) + +with k8s_credentials.get_client("core") as v1_core_client: + for pod in v1_core_client.list_namespaced_pod('default').items: + print(pod.metadata.name) +``` + #### List jobs in a specific namespace ```python @@ -32,9 +47,27 @@ from prefect_kubernetes.jobs import list_namespaced_job @flow def kubernetes_orchestrator(): - namespaced_job_list = list_namespaced_job( + v1_job_list = list_namespaced_job( + kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), namespace="my-namespace", + ) +``` + +#### Delete a pod using `V1DeleteOptions` + +```python +from kubernetes.client.models import V1DeleteOptions + +from prefect import flow +from prefect_kubernetes.credentials import KubernetesCredentials +from prefect_kubernetes.pods import delete_namespaced_pod + +@flow +def kubernetes_orchestrator(): + v1_pod_list = delete_namespaced_pod( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), + body=V1DeleteOptions(grace_period_seconds=42), + namespace="my-namespace" ) ``` From 24737aff8b16ccec9ef407e5a8ab5c25a329dab1 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 22:03:17 -0600 Subject: [PATCH 08/14] fix examples for `KubernetesCredentials` --- prefect_kubernetes/credentials.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/prefect_kubernetes/credentials.py b/prefect_kubernetes/credentials.py index 510656b..f203267 100644 --- a/prefect_kubernetes/credentials.py +++ b/prefect_kubernetes/credentials.py @@ -36,13 +36,19 @@ class KubernetesCredentials(Block): Create resource-specific API clients from KubernetesCredentials: ```python + from kubernetes.client.models import AppsV1Api, BatchV1Api, CoreV1Api from prefect_kubernetes import KubernetesCredentials kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials") - kubernetes_app_v1_client = kubernetes_credentials.get_app_client() - kubernetes_batch_v1_client = kubernetes_credentials.get_batch_client() - kubernetes_core_v1_client = kubernetes_credentials.get_core_client() + with kubernetes_credentials.get_client("apps") as v1_apps_client: + assert isinstance(v1_apps_client, AppsV1Api) + + with kubernetes_credentials.get_client("batch") as v1_batch_client: + assert isinstance(v1_batch_client, BatchV1Api) + + with kubernetes_credentials.get_client("core") as v1_core_client: + assert isinstance(v1_core_client, CoreV1Api) ``` Create a namespaced job: @@ -58,8 +64,8 @@ class KubernetesCredentials(Block): @flow def kubernetes_orchestrator(): create_namespaced_job( + kubernetes_credentials=kubernetes_credentials, body=V1Job(**{"metadata": {"name": "my-job"}}), - kubernetes_credentials=kubernetes_credentials ) ``` """ @@ -103,7 +109,7 @@ def get_client( def get_resource_specific_client( self, client_type: str, - ) -> KubernetesClient: + ) -> Union[AppsV1Api, BatchV1Api, CoreV1Api]: """ Utility function for configuring a generic Kubernetes client. It will attempt to connect to a Kubernetes cluster in three steps with @@ -120,10 +126,14 @@ def get_resource_specific_client( kube config file. Args: - resource: The Kubernetes resource to configure a client for. + client_type: The Kubernetes API client type for interacting with specific + Kubernetes resources. Returns: - An authenticated, generic Kubernetes Client. + KubernetesClient: An authenticated, resource-specific Kubernetes Client. + + Raises: + ValueError: If `client_type` is not a valid Kubernetes API client type. """ if self.cluster_config: From 11cdd92893c06a0d43ee229d557382d8db403894 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Thu, 10 Nov 2022 22:50:06 -0600 Subject: [PATCH 09/14] add coverage for custom print func for read pod logs --- prefect_kubernetes/pods.py | 17 ++++++++--------- tests/conftest.py | 10 ++++++++++ tests/test_pods.py | 15 ++++++++++++++- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/prefect_kubernetes/pods.py b/prefect_kubernetes/pods.py index 7539496..a0dba6d 100644 --- a/prefect_kubernetes/pods.py +++ b/prefect_kubernetes/pods.py @@ -1,4 +1,5 @@ """Module for interacting with Kubernetes pods from Prefect flows.""" + from typing import Any, Callable, Dict, Optional, Union from kubernetes.client.exceptions import ApiException @@ -289,22 +290,20 @@ def kubernetes_orchestrator(): **kube_kwargs, ) - # From the kubernetes.watch documentation: + # From the `kubernetes.watch` documentation: # Note that watching an API resource can expire. The method tries to # resume automatically once from the last result, but if that last result # is too old as well, an `ApiException` exception will be thrown with # ``code`` 410. - read_logs_coroutine = run_sync_in_worker_thread( - core_v1_client.read_namespaced_pod_log, - name=pod_name, - namespace=namespace, - container=container, - ) - while True: try: - for log_line in Watch().stream(await read_logs_coroutine): + for log_line in Watch().stream( + core_v1_client.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + container=container, + ): print_func(log_line) return diff --git a/tests/conftest.py b/tests/conftest.py index e03d3b1..6593f38 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import pytest import yaml from kubernetes.client import AppsV1Api, BatchV1Api, CoreV1Api +from kubernetes.client.exceptions import ApiException from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect_kubernetes.credentials import KubernetesCredentials @@ -82,3 +83,12 @@ def get_client(self, _): ) return core_client + + +@pytest.fixture +def mock_stream_timeout(monkeypatch): + + monkeypatch.setattr( + "kubernetes.watch.Watch.stream", + MagicMock(side_effect=ApiException(status=408)), + ) diff --git a/tests/test_pods.py b/tests/test_pods.py index 74bb95e..70381c4 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -1,5 +1,5 @@ import pytest -from kubernetes.client.exceptions import ApiValueError +from kubernetes.client.exceptions import ApiException, ApiValueError from kubernetes.client.models import V1DeleteOptions, V1Pod from prefect_kubernetes.pods import ( @@ -153,3 +153,16 @@ async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod): body=V1Pod(**{"random_not_real": "shabba-ranks"}), kubernetes_credentials=kubernetes_credentials, ) + + +async def test_read_pod_logs_custom_print_func_timeout( + kubernetes_credentials, mock_stream_timeout +): + with pytest.raises(ApiException): + await read_namespaced_pod_logs.fn( + kubernetes_credentials=kubernetes_credentials, + pod_name="test_pod", + container="test_container", + namespace="ns", + print_func=print, + ) From bdfb395982436cce552119a0b996fb42f66c09dd Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Fri, 11 Nov 2022 14:32:51 -0600 Subject: [PATCH 10/14] update job body names according to pod changes --- prefect_kubernetes/credentials.py | 4 ++-- prefect_kubernetes/jobs.py | 32 +++++++++++++++--------------- prefect_kubernetes/pods.py | 33 +++++++++++++++---------------- tests/test_credentials.py | 4 +++- tests/test_pods.py | 28 +++++++++++++++----------- 5 files changed, 53 insertions(+), 48 deletions(-) diff --git a/prefect_kubernetes/credentials.py b/prefect_kubernetes/credentials.py index f203267..3dd4481 100644 --- a/prefect_kubernetes/credentials.py +++ b/prefect_kubernetes/credentials.py @@ -1,7 +1,7 @@ """Module for defining Kubernetes credential handling and client generation.""" from contextlib import contextmanager -from typing import Generator, Optional, Union +from typing import Generator, Literal, Optional, Union from kubernetes import config as kube_config from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, Configuration, CoreV1Api @@ -78,7 +78,7 @@ def kubernetes_orchestrator(): @contextmanager def get_client( self, - client_type: str, + client_type: Literal["apps", "batch", "core"], configuration: Optional[Configuration] = None, ) -> Generator[KubernetesClient, None, None]: """Convenience method for retrieving a Kubernetes API client for deployment resources. diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index 0e9a5ed..b7771c9 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -12,7 +12,7 @@ @task async def create_namespaced_job( kubernetes_credentials: KubernetesCredentials, - body: V1Job, + new_job: V1Job, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Job: @@ -21,7 +21,7 @@ async def create_namespaced_job( Args: kubernetes_credentials: `KubernetesCredentials` block holding authentication needed to generate the required API client. - body: A Kubernetes `V1Job` specification. + new_job: A Kubernetes `V1Job` specification. namespace: The Kubernetes namespace to create this job in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API (e.g. `{"pretty": "...", "dry_run": "..."}`). @@ -40,7 +40,7 @@ async def create_namespaced_job( @flow def kubernetes_orchestrator(): v1_job_metadata = create_namespaced_job( - body=V1Job(**{"metadata": {"name": "test-job"}}), + new_job=V1Job(metadata={"labels": {"foo": "bar"}}), kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), ) ``` @@ -50,7 +50,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( batch_v1_client.create_namespaced_job, namespace=namespace, - body=body, + body=new_job, **kube_kwargs, ) @@ -59,7 +59,7 @@ def kubernetes_orchestrator(): async def delete_namespaced_job( kubernetes_credentials: KubernetesCredentials, job_name: str, - body: Optional[V1DeleteOptions] = None, + delete_options: Optional[V1DeleteOptions] = None, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Status: @@ -69,7 +69,7 @@ async def delete_namespaced_job( kubernetes_credentials: `KubernetesCredentials` block holding authentication needed to generate the required API client. job_name: The name of a job to delete. - body: A Kubernetes `V1DeleteOptions` object. + delete_options: A Kubernetes `V1DeleteOptions` object. namespace: The Kubernetes namespace to delete this job in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API (e.g. `{"pretty": "...", "dry_run": "..."}`). @@ -91,7 +91,7 @@ def kubernetes_orchestrator(): v1_job_status = delete_namespaced_job( job_name="my-job", kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), - body=V1DeleteOptions(propagation_policy="Foreground"), + delete_options=V1DeleteOptions(propagation_policy="Foreground"), ) ``` """ @@ -101,7 +101,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( batch_v1_client.delete_namespaced_job, name=job_name, - body=body, + body=delete_options, namespace=namespace, **kube_kwargs, ) @@ -153,7 +153,7 @@ def kubernetes_orchestrator(): async def patch_namespaced_job( kubernetes_credentials: KubernetesCredentials, job_name: str, - body: V1Job, + job_updates: V1Job, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Job: @@ -163,7 +163,7 @@ async def patch_namespaced_job( kubernetes_credentials: KubernetesCredentials block holding authentication needed to generate the required API client. job_name: The name of a job to patch. - body: A Kubernetes `V1Job` specification. + job_updates: A Kubernetes `V1Job` specification. namespace: The Kubernetes namespace to patch this job in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API (e.g. `{"pretty": "...", "dry_run": "..."}`). @@ -187,7 +187,7 @@ async def patch_namespaced_job( def kubernetes_orchestrator(): v1_job_metadata = patch_namespaced_job( job_name="my-job", - body=V1Job(**{"metadata": {"labels": {"foo": "bar"}}}), + job_updates=V1Job(metadata={"labels": {"foo": "bar"}}}), kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), ) ``` @@ -199,7 +199,7 @@ def kubernetes_orchestrator(): batch_v1_client.patch_namespaced_job, name=job_name, namespace=namespace, - body=body, + body=job_updates, **kube_kwargs, ) @@ -256,7 +256,7 @@ def kubernetes_orchestrator(): async def replace_namespaced_job( kubernetes_credentials: KubernetesCredentials, job_name: str, - body: V1Job, + new_job: V1Job, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Job: @@ -266,7 +266,7 @@ async def replace_namespaced_job( kubernetes_credentials: `KubernetesCredentials` block holding authentication needed to generate the required API client. job_name: The name of a job to replace. - body: A Kubernetes `V1Job` specification. + new_job: A Kubernetes `V1Job` specification. namespace: The Kubernetes namespace to replace this job in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API (e.g. `{"pretty": "...", "dry_run": "..."}`). @@ -284,7 +284,7 @@ async def replace_namespaced_job( @flow def kubernetes_orchestrator(): v1_job_metadata = replace_namespaced_job( - body={"metadata": {"labels": {"foo": "bar"}}}, + new_job=V1Job(metadata={"labels": {"foo": "bar"}}), job_name="my-job", kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), ) @@ -295,7 +295,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( batch_v1_client.replace_namespaced_job, name=job_name, - body=body, + body=new_job, namespace=namespace, **kube_kwargs, ) diff --git a/prefect_kubernetes/pods.py b/prefect_kubernetes/pods.py index a0dba6d..51effa8 100644 --- a/prefect_kubernetes/pods.py +++ b/prefect_kubernetes/pods.py @@ -1,5 +1,4 @@ """Module for interacting with Kubernetes pods from Prefect flows.""" - from typing import Any, Callable, Dict, Optional, Union from kubernetes.client.exceptions import ApiException @@ -14,7 +13,7 @@ @task async def create_namespaced_pod( kubernetes_credentials: KubernetesCredentials, - body: V1Pod, + new_pod: V1Pod, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Pod: @@ -23,7 +22,7 @@ async def create_namespaced_pod( Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. - body: A Kubernetes `V1Pod` specification. + new_pod: A Kubernetes `V1Pod` specification. namespace: The Kubernetes namespace to create this pod in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. @@ -42,7 +41,7 @@ async def create_namespaced_pod( def kubernetes_orchestrator(): v1_pod_metadata = create_namespaced_pod( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), - body=V1Pod(**{"metadata": {"name": "test-pod"}}), + new_pod=V1Pod(metadata={"name": "test-pod"}), ) ``` """ @@ -51,7 +50,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( core_v1_client.create_namespaced_pod, namespace=namespace, - body=body, + body=new_pod, **kube_kwargs, ) @@ -60,7 +59,7 @@ def kubernetes_orchestrator(): async def delete_namespaced_pod( kubernetes_credentials: KubernetesCredentials, pod_name: str, - body: Optional[V1DeleteOptions] = None, + delete_options: Optional[V1DeleteOptions] = None, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Pod: @@ -70,7 +69,7 @@ async def delete_namespaced_pod( kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. pod_name: The name of the pod to delete. - body: A Kubernetes `V1DeleteOptions` object. + delete_options: A Kubernetes `V1DeleteOptions` object. namespace: The Kubernetes namespace to delete this pod from. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. @@ -90,7 +89,7 @@ def kubernetes_orchestrator(): v1_pod_metadata = delete_namespaced_pod( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), pod_name="test-pod", - body=V1DeleteOptions(grace_period_seconds=0), + delete_options=V1DeleteOptions(grace_period_seconds=0), ) ``` """ @@ -99,7 +98,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( core_v1_client.delete_namespaced_pod, pod_name, - body=body, + body=delete_options, namespace=namespace, **kube_kwargs, ) @@ -147,7 +146,7 @@ def kubernetes_orchestrator(): async def patch_namespaced_pod( kubernetes_credentials: KubernetesCredentials, pod_name: str, - body: V1Pod, + pod_updates: V1Pod, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Pod: @@ -157,7 +156,7 @@ async def patch_namespaced_pod( kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. pod_name: The name of the pod to patch. - body: A Kubernetes `V1Pod` object. + pod_updates: A Kubernetes `V1Pod` object. namespace: The Kubernetes namespace to patch this pod in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. @@ -177,7 +176,7 @@ def kubernetes_orchestrator(): v1_pod_metadata = patch_namespaced_pod( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), pod_name="test-pod", - body=V1Pod(**{"metadata": {"labels": {"foo": "bar"}}}), + pod_updates=V1Pod(metadata={"labels": {"foo": "bar"}}), ) ``` """ @@ -187,7 +186,7 @@ def kubernetes_orchestrator(): core_v1_client.patch_namespaced_pod, name=pod_name, namespace=namespace, - body=body, + body=pod_updates, **kube_kwargs, ) @@ -316,7 +315,7 @@ def kubernetes_orchestrator(): async def replace_namespaced_pod( kubernetes_credentials: KubernetesCredentials, pod_name: str, - body: V1Pod, + new_pod: V1Pod, namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Pod: @@ -326,7 +325,7 @@ async def replace_namespaced_pod( kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. pod_name: The name of the pod to replace. - body: A Kubernetes `V1Pod` object. + new_pod: A Kubernetes `V1Pod` object. namespace: The Kubernetes namespace to replace this pod in. **kube_kwargs: Optional extra keyword arguments to pass to the Kubernetes API. @@ -346,7 +345,7 @@ def kubernetes_orchestrator(): v1_pod_metadata = replace_namespaced_pod( kubernetes_credentials=KubernetesCredentials.load("k8s-creds"), pod_name="test-pod", - body=V1Pod(**{"metadata": {"labels": {"foo": "bar"}}}) + new_pod=V1Pod(metadata={"labels": {"foo": "bar"}}) ) ``` """ @@ -354,7 +353,7 @@ def kubernetes_orchestrator(): return await run_sync_in_worker_thread( core_v1_client.replace_namespaced_pod, - body=body, + body=new_pod, name=pod_name, namespace=namespace, **kube_kwargs, diff --git a/tests/test_credentials.py b/tests/test_credentials.py index 20cc5ce..2168d3e 100644 --- a/tests/test_credentials.py +++ b/tests/test_credentials.py @@ -17,6 +17,8 @@ def test_client_return_type(kubernetes_credentials, resource_type, client_type): def test_client_bad_resource_type(kubernetes_credentials): - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="Invalid client type provided 'shoo-ba-daba-doo'" + ): with kubernetes_credentials.get_client("shoo-ba-daba-doo"): pass diff --git a/tests/test_pods.py b/tests/test_pods.py index 70381c4..1b521ae 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -16,17 +16,17 @@ async def test_invalid_body_raises_error(kubernetes_credentials): with pytest.raises(ApiValueError): await create_namespaced_pod.fn( - body=None, kubernetes_credentials=kubernetes_credentials + new_pod=None, kubernetes_credentials=kubernetes_credentials ) with pytest.raises(ApiValueError): await patch_namespaced_pod.fn( - body=None, pod_name="", kubernetes_credentials=kubernetes_credentials + pod_updates=None, pod_name="", kubernetes_credentials=kubernetes_credentials ) async def test_create_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await create_namespaced_pod.fn( - body=V1Pod(**{"metadata": {"name": "test-pod"}}), + new_pod=V1Pod(metadata={"name": "test-pod"}), a="test", kubernetes_credentials=kubernetes_credentials, ) @@ -41,7 +41,7 @@ async def test_delete_namespaced_pod(kubernetes_credentials, _mock_api_core_clie await delete_namespaced_pod.fn( kubernetes_credentials=kubernetes_credentials, pod_name="test_pod", - body=V1DeleteOptions(grace_period_seconds=42), + delete_options=V1DeleteOptions(grace_period_seconds=42), a="test", ) assert ( @@ -62,7 +62,7 @@ async def test_bad_v1_delete_options(kubernetes_credentials, _mock_api_core_clie await delete_namespaced_pod.fn( kubernetes_credentials=kubernetes_credentials, pod_name="test_pod", - body=V1DeleteOptions(skrrrt_skrrrt="yeehaw"), + delete_options=V1DeleteOptions(skrrrt_skrrrt="yeehaw"), ) @@ -78,10 +78,10 @@ async def test_list_namespaced_pod(kubernetes_credentials, _mock_api_core_client async def test_patch_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await patch_namespaced_pod.fn( - body=V1Pod(**{"metadata": {"name": "test-pod"}}), + kubernetes_credentials=kubernetes_credentials, + pod_updates=V1Pod(metadata={"name": "test-pod"}), pod_name="test_pod", a="test", - kubernetes_credentials=kubernetes_credentials, ) assert _mock_api_core_client.patch_namespaced_pod.call_args[1]["body"].metadata == { "name": "test-pod" @@ -126,7 +126,7 @@ async def test_read_namespaced_pod_logs(kubernetes_credentials, _mock_api_core_c async def test_replace_namespaced_pod(kubernetes_credentials, _mock_api_core_client): await replace_namespaced_pod.fn( pod_name="test_pod", - body=V1Pod(**{"metadata": {"name": "test-pod"}}), + new_pod=V1Pod(metadata={"name": "test-pod"}), namespace="ns", a="test", kubernetes_credentials=kubernetes_credentials, @@ -144,13 +144,17 @@ async def test_replace_namespaced_pod(kubernetes_credentials, _mock_api_core_cli @pytest.mark.parametrize( - "task_accepting_pod", - [create_namespaced_pod, patch_namespaced_pod, replace_namespaced_pod], + "task_accepting_pod, pod_kwarg", + [ + (create_namespaced_pod, "new_pod"), + (patch_namespaced_pod, "pod_updates"), + (replace_namespaced_pod, "new_pod"), + ], ) -async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod): +async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod, pod_kwarg): with pytest.raises(TypeError): await task_accepting_pod.fn( - body=V1Pod(**{"random_not_real": "shabba-ranks"}), + **{pod_kwarg: V1Pod(skrrrt_skrrrt="yeehaw")}, kubernetes_credentials=kubernetes_credentials, ) From b4d640d706bb5b451798d39b2f3d936d904e5a29 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Mon, 14 Nov 2022 11:01:14 -0600 Subject: [PATCH 11/14] add test for `read_namespaced_pod_log` with custom print func --- prefect_kubernetes/credentials.py | 3 ++- prefect_kubernetes/pods.py | 35 ++++++++++++------------------- tests/test_jobs.py | 12 +++++------ tests/test_pods.py | 33 +++++++++++++++++++++++++---- 4 files changed, 50 insertions(+), 33 deletions(-) diff --git a/prefect_kubernetes/credentials.py b/prefect_kubernetes/credentials.py index 3dd4481..289fdb3 100644 --- a/prefect_kubernetes/credentials.py +++ b/prefect_kubernetes/credentials.py @@ -1,7 +1,7 @@ """Module for defining Kubernetes credential handling and client generation.""" from contextlib import contextmanager -from typing import Generator, Literal, Optional, Union +from typing import Generator, Optional, Union from kubernetes import config as kube_config from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, Configuration, CoreV1Api @@ -9,6 +9,7 @@ from prefect.blocks.core import Block from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.utilities.collections import listrepr +from typing_extensions import Literal KubernetesClient = Union[AppsV1Api, BatchV1Api, CoreV1Api] diff --git a/prefect_kubernetes/pods.py b/prefect_kubernetes/pods.py index 51effa8..a553bff 100644 --- a/prefect_kubernetes/pods.py +++ b/prefect_kubernetes/pods.py @@ -1,7 +1,6 @@ """Module for interacting with Kubernetes pods from Prefect flows.""" from typing import Any, Callable, Dict, Optional, Union -from kubernetes.client.exceptions import ApiException from kubernetes.client.models import V1DeleteOptions, V1Pod, V1PodList from kubernetes.watch import Watch from prefect import task @@ -235,7 +234,7 @@ def kubernetes_orchestrator(): @task -async def read_namespaced_pod_logs( +async def read_namespaced_pod_log( kubernetes_credentials: KubernetesCredentials, pod_name: str, container: str, @@ -245,6 +244,9 @@ async def read_namespaced_pod_logs( ) -> Union[str, None]: """Read logs from a Kubernetes pod in a given namespace. + If `print_func` is provided, the logs will be streamed using that function. + If the pod is no longer running, logs generated up to that point will be returned. + Args: kubernetes_credentials: `KubernetesCredentials` block for creating authenticated Kubernetes API clients. @@ -289,26 +291,15 @@ def kubernetes_orchestrator(): **kube_kwargs, ) - # From the `kubernetes.watch` documentation: - # Note that watching an API resource can expire. The method tries to - # resume automatically once from the last result, but if that last result - # is too old as well, an `ApiException` exception will be thrown with - # ``code`` 410. - - while True: - try: - for log_line in Watch().stream( - core_v1_client.read_namespaced_pod_log, - name=pod_name, - namespace=namespace, - container=container, - ): - print_func(log_line) - return - - except ApiException as e: - if e.status != 410: - raise + # should no longer need to manually refresh on ApiException.status == 410 + # as of https://github.com/kubernetes-client/python-base/pull/133 + for log_line in Watch().stream( + core_v1_client.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + container=container, + ): + print_func(log_line) @task diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 7909e4b..a057bde 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -15,21 +15,21 @@ async def test_null_body_raises_error(kubernetes_credentials): with pytest.raises(ApiValueError): await create_namespaced_job.fn( - body=None, kubernetes_credentials=kubernetes_credentials + new_job=None, kubernetes_credentials=kubernetes_credentials ) with pytest.raises(ApiValueError): await patch_namespaced_job.fn( - body=None, job_name="", kubernetes_credentials=kubernetes_credentials + job_updates=None, job_name="", kubernetes_credentials=kubernetes_credentials ) with pytest.raises(ApiValueError): await replace_namespaced_job.fn( - body=None, job_name="", kubernetes_credentials=kubernetes_credentials + new_job=None, job_name="", kubernetes_credentials=kubernetes_credentials ) async def test_create_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await create_namespaced_job.fn( - body=V1Job(**{"metadata": {"name": "test-job"}}), + new_job=V1Job(metadata={"name": "test-job"}), a="test", kubernetes_credentials=kubernetes_credentials, ) @@ -64,7 +64,7 @@ async def test_list_namespaced_job(kubernetes_credentials, _mock_api_batch_clien async def test_patch_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await patch_namespaced_job.fn( - body=V1Job(**{"metadata": {"name": "test-job"}}), + job_updates=V1Job(metadata={"name": "test-job"}), job_name="test-job", a="test", kubernetes_credentials=kubernetes_credentials, @@ -93,7 +93,7 @@ async def test_read_namespaced_job(kubernetes_credentials, _mock_api_batch_clien async def test_replace_namespaced_job(kubernetes_credentials, _mock_api_batch_client): await replace_namespaced_job.fn( job_name="test-job", - body=V1Job(**{"metadata": {"name": "test-job"}}), + new_job=V1Job(metadata={"name": "test-job"}), namespace="ns", a="test", kubernetes_credentials=kubernetes_credentials, diff --git a/tests/test_pods.py b/tests/test_pods.py index 1b521ae..0ce14c4 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -8,7 +8,7 @@ list_namespaced_pod, patch_namespaced_pod, read_namespaced_pod, - read_namespaced_pod_logs, + read_namespaced_pod_log, replace_namespaced_pod, ) @@ -103,7 +103,7 @@ async def test_read_namespaced_pod(kubernetes_credentials, _mock_api_core_client async def test_read_namespaced_pod_logs(kubernetes_credentials, _mock_api_core_client): - await read_namespaced_pod_logs.fn( + await read_namespaced_pod_log.fn( pod_name="test_pod", container="test_container", namespace="ns", @@ -159,11 +159,36 @@ async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod, pod ) -async def test_read_pod_logs_custom_print_func_timeout( +async def test_read_pod_log_custom_print_func( + kubernetes_credentials, _mock_api_core_client +): + s = await read_namespaced_pod_log.fn( + kubernetes_credentials=kubernetes_credentials, + pod_name="test_pod", + container="test_container", + namespace="ns", + print_func=print, + ) + + assert not s + + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["name"] == "test_pod" + ) + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["container"] + == "test_container" + ) + assert ( + _mock_api_core_client.read_namespaced_pod_log.call_args[1]["namespace"] == "ns" + ) + + +async def test_read_pod_log_custom_print_func_timeout( kubernetes_credentials, mock_stream_timeout ): with pytest.raises(ApiException): - await read_namespaced_pod_logs.fn( + await read_namespaced_pod_log.fn( kubernetes_credentials=kubernetes_credentials, pod_name="test_pod", container="test_container", From 199ca6c7bb2750cd21ca0ffa416fb4a8b7d1760e Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Tue, 15 Nov 2022 11:20:12 -0600 Subject: [PATCH 12/14] add test for pod log no timeout --- tests/conftest.py | 9 +++++++++ tests/test_pods.py | 13 ++----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6593f38..b26bc1d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -92,3 +92,12 @@ def mock_stream_timeout(monkeypatch): "kubernetes.watch.Watch.stream", MagicMock(side_effect=ApiException(status=408)), ) + + +@pytest.fixture +def mock_pod_log(monkeypatch): + + monkeypatch.setattr( + "kubernetes.watch.Watch.stream", + MagicMock(return_value=["test log"]), + ) diff --git a/tests/test_pods.py b/tests/test_pods.py index 0ce14c4..db3c8ec 100644 --- a/tests/test_pods.py +++ b/tests/test_pods.py @@ -160,7 +160,7 @@ async def test_bad_v1_pod_kwargs(kubernetes_credentials, task_accepting_pod, pod async def test_read_pod_log_custom_print_func( - kubernetes_credentials, _mock_api_core_client + kubernetes_credentials, _mock_api_core_client, mock_pod_log, capsys ): s = await read_namespaced_pod_log.fn( kubernetes_credentials=kubernetes_credentials, @@ -172,16 +172,7 @@ async def test_read_pod_log_custom_print_func( assert not s - assert ( - _mock_api_core_client.read_namespaced_pod_log.call_args[1]["name"] == "test_pod" - ) - assert ( - _mock_api_core_client.read_namespaced_pod_log.call_args[1]["container"] - == "test_container" - ) - assert ( - _mock_api_core_client.read_namespaced_pod_log.call_args[1]["namespace"] == "ns" - ) + assert capsys.readouterr().out == "test log\n" async def test_read_pod_log_custom_print_func_timeout( From fe11e873ed9fb8aac0ae938a7c2704c693e0ad0d Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Tue, 15 Nov 2022 11:39:16 -0600 Subject: [PATCH 13/14] typo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 357f8bf..8d79171 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) - Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19) -- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) +- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) ### Changed - `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21) From 93597cc2ebf3961d6cd2f1ed30116d8627876c74 Mon Sep 17 00:00:00 2001 From: Nathan Nowack Date: Wed, 16 Nov 2022 10:33:02 -0600 Subject: [PATCH 14/14] word --- prefect_kubernetes/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prefect_kubernetes/jobs.py b/prefect_kubernetes/jobs.py index b7771c9..161cba8 100644 --- a/prefect_kubernetes/jobs.py +++ b/prefect_kubernetes/jobs.py @@ -157,7 +157,7 @@ async def patch_namespaced_job( namespace: Optional[str] = "default", **kube_kwargs: Dict[str, Any], ) -> V1Job: - """Task for deleting a namespaced Kubernetes job. + """Task for patching a namespaced Kubernetes job. Args: kubernetes_credentials: KubernetesCredentials block