Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

adds pod tasks #21

Merged
merged 16 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)

### Changed
- `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)


### Deprecated

Expand Down
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Welcome!

Prefect integrations for interacting with prefect-kubernetes.
Prefect integrations for interacting with Kubernetes resources.

## Getting Started

Expand All @@ -23,6 +23,21 @@ pip install prefect-kubernetes
```

### Write and run a flow
#### Generate a resource-specific client from `KubernetesClusterConfig`

```python
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect_kubernetes.credentials import KubernetesCredentials

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)

with k8s_credentials.get_client("core") as v1_core_client:
for pod in v1_core_client.list_namespaced_pod('default').items:
print(pod.metadata.name)
```

#### List jobs in a specific namespace

```python
Expand All @@ -32,9 +47,27 @@ from prefect_kubernetes.jobs import list_namespaced_job

@flow
def kubernetes_orchestrator():
namespaced_job_list = list_namespaced_job(
v1_job_list = list_namespaced_job(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
namespace="my-namespace",
)
```

#### Delete a pod using `V1DeleteOptions`

```python
from kubernetes.client.models import V1DeleteOptions

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.pods import delete_namespaced_pod

@flow
def kubernetes_orchestrator():
v1_pod_list = delete_namespaced_pod(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
body=V1DeleteOptions(grace_period_seconds=42),
namespace="my-namespace"
)
```

Expand Down
1 change: 1 addition & 0 deletions docs/pods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_kubernetes.pods
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ nav:
- Credentials: credentials.md
- Exceptions: exceptions.md
- Jobs: jobs.md
- Pods: pods.md
100 changes: 61 additions & 39 deletions prefect_kubernetes/credentials.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
"""Module for defining Kubernetes credential handling and client generation."""

from contextlib import contextmanager
from typing import Generator, Optional
from typing import Generator, Optional, Union

from kubernetes import config as kube_config
from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, CoreV1Api
from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, Configuration, CoreV1Api
from kubernetes.config.config_exception import ConfigException
from prefect.blocks.core import Block
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect.utilities.collections import listrepr
from typing_extensions import Literal

KubernetesClient = Union[AppsV1Api, BatchV1Api, CoreV1Api]

K8S_CLIENT_TYPES = {
"apps": AppsV1Api,
"batch": BatchV1Api,
"core": CoreV1Api,
}
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved


class KubernetesCredentials(Block):
Expand All @@ -27,13 +37,19 @@ class KubernetesCredentials(Block):

Create resource-specific API clients from KubernetesCredentials:
```python
from kubernetes.client.models import AppsV1Api, BatchV1Api, CoreV1Api
from prefect_kubernetes import KubernetesCredentials

kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials")

kubernetes_app_v1_client = kubernetes_credentials.get_app_client()
kubernetes_batch_v1_client = kubernetes_credentials.get_batch_client()
kubernetes_core_v1_client = kubernetes_credentials.get_core_client()
with kubernetes_credentials.get_client("apps") as v1_apps_client:
assert isinstance(v1_apps_client, AppsV1Api)

with kubernetes_credentials.get_client("batch") as v1_batch_client:
assert isinstance(v1_batch_client, BatchV1Api)

with kubernetes_credentials.get_client("core") as v1_core_client:
assert isinstance(v1_core_client, CoreV1Api)
```

Create a namespaced job:
Expand All @@ -49,8 +65,8 @@ class KubernetesCredentials(Block):
@flow
def kubernetes_orchestrator():
create_namespaced_job(
body=V1Job(**{"Marvin": "42"}),
kubernetes_credentials=kubernetes_credentials
kubernetes_credentials=kubernetes_credentials,
body=V1Job(**{"metadata": {"name": "my-job"}}),
)
```
"""
Expand All @@ -61,46 +77,40 @@ def kubernetes_orchestrator():
cluster_config: Optional[KubernetesClusterConfig] = None

@contextmanager
def get_app_client(self) -> Generator[AppsV1Api, None, None]:
def get_client(
self,
client_type: Literal["apps", "batch", "core"],
configuration: Optional[Configuration] = None,
) -> Generator[KubernetesClient, None, None]:
"""Convenience method for retrieving a Kubernetes API client for deployment resources.

Yields:
Kubernetes API client to interact with "deployment" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield AppsV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()

@contextmanager
def get_batch_client(self) -> Generator[BatchV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for job resources.
Args:
client_type: The resource-specific type of Kubernetes client to retrieve.

Yields:
Kubernetes API client to interact with "job" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield BatchV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()
An authenticated, resource-specific Kubernetes API client.

@contextmanager
def get_core_client(self) -> Generator[CoreV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for core resources.
Example:
```python
from prefect_kubernetes.credentials import KubernetesCredentials

Yields:
Kubernetes API client to interact with "pod", "service"
and "secret" resources.
with KubernetesCredentials.get_client("core") as core_v1_client:
for pod in core_v1_client.list_namespaced_pod():
print(pod.metadata.name)
```
"""
with self.get_generic_client() as generic_client:
client_config = configuration or Configuration()

with ApiClient(configuration=client_config) as generic_client:
try:
yield CoreV1Api(api_client=generic_client)
yield self.get_resource_specific_client(client_type)
finally:
generic_client.rest_client.pool_manager.clear()

def get_generic_client(self) -> ApiClient:
def get_resource_specific_client(
self,
client_type: str,
) -> Union[AppsV1Api, BatchV1Api, CoreV1Api]:
"""
Utility function for configuring a generic Kubernetes client.
It will attempt to connect to a Kubernetes cluster in three steps with
Expand All @@ -116,8 +126,15 @@ def get_generic_client(self) -> ApiClient:
3. Attempt out-of-cluster connection using the default location for a
kube config file.

Args:
client_type: The Kubernetes API client type for interacting with specific
Kubernetes resources.

Returns:
An authenticated, generic Kubernetes Client.
KubernetesClient: An authenticated, resource-specific Kubernetes Client.

Raises:
ValueError: If `client_type` is not a valid Kubernetes API client type.
"""

if self.cluster_config:
Expand All @@ -128,5 +145,10 @@ def get_generic_client(self) -> ApiClient:
except ConfigException:
kube_config.load_kube_config()

client = ApiClient()
return client
try:
return K8S_CLIENT_TYPES[client_type]()
except KeyError:
raise ValueError(
f"Invalid client type provided '{client_type}'."
f" Must be one of {listrepr(K8S_CLIENT_TYPES.keys())}."
)
Loading