Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #21 from PrefectHQ/pod-tasks
Browse files Browse the repository at this point in the history
adds `pod` tasks
  • Loading branch information
zzstoatzz authored Nov 16, 2022
2 parents 781ca60 + 93597cc commit 7d32458
Show file tree
Hide file tree
Showing 11 changed files with 739 additions and 109 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `pod` resources: `create_namespaced_pod`, `delete_namespaced_pod`, `list_namespaced_pod`, `patch_namespaced_pod`, `read_namespaced_pod`, `read_namespaced_pod_logs`, `replace_namespaced_pod` - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)

### Changed
- `KubernetesCredentials` block to use a single `get_client` method capable of creating all resource-specific client types - [#21](https://github.com/PrefectHQ/prefect-kubernetes/pull/21)


### Deprecated

Expand Down
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Welcome!

Prefect integrations for interacting with prefect-kubernetes.
Prefect integrations for interacting with Kubernetes resources.

## Getting Started

Expand All @@ -23,6 +23,21 @@ pip install prefect-kubernetes
```

### Write and run a flow
#### Generate a resource-specific client from `KubernetesClusterConfig`

```python
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect_kubernetes.credentials import KubernetesCredentials

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)

with k8s_credentials.get_client("core") as v1_core_client:
for pod in v1_core_client.list_namespaced_pod('default').items:
print(pod.metadata.name)
```

#### List jobs in a specific namespace

```python
Expand All @@ -32,9 +47,27 @@ from prefect_kubernetes.jobs import list_namespaced_job

@flow
def kubernetes_orchestrator():
namespaced_job_list = list_namespaced_job(
v1_job_list = list_namespaced_job(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
namespace="my-namespace",
)
```

#### Delete a pod using `V1DeleteOptions`

```python
from kubernetes.client.models import V1DeleteOptions

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.pods import delete_namespaced_pod

@flow
def kubernetes_orchestrator():
v1_pod_list = delete_namespaced_pod(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
body=V1DeleteOptions(grace_period_seconds=42),
namespace="my-namespace"
)
```

Expand Down
1 change: 1 addition & 0 deletions docs/pods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_kubernetes.pods
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ nav:
- Credentials: credentials.md
- Exceptions: exceptions.md
- Jobs: jobs.md
- Pods: pods.md
100 changes: 61 additions & 39 deletions prefect_kubernetes/credentials.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
"""Module for defining Kubernetes credential handling and client generation."""

from contextlib import contextmanager
from typing import Generator, Optional
from typing import Generator, Optional, Union

from kubernetes import config as kube_config
from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, CoreV1Api
from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, Configuration, CoreV1Api
from kubernetes.config.config_exception import ConfigException
from prefect.blocks.core import Block
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect.utilities.collections import listrepr
from typing_extensions import Literal

KubernetesClient = Union[AppsV1Api, BatchV1Api, CoreV1Api]

K8S_CLIENT_TYPES = {
"apps": AppsV1Api,
"batch": BatchV1Api,
"core": CoreV1Api,
}


class KubernetesCredentials(Block):
Expand All @@ -27,13 +37,19 @@ class KubernetesCredentials(Block):
Create resource-specific API clients from KubernetesCredentials:
```python
from kubernetes.client.models import AppsV1Api, BatchV1Api, CoreV1Api
from prefect_kubernetes import KubernetesCredentials
kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials")
kubernetes_app_v1_client = kubernetes_credentials.get_app_client()
kubernetes_batch_v1_client = kubernetes_credentials.get_batch_client()
kubernetes_core_v1_client = kubernetes_credentials.get_core_client()
with kubernetes_credentials.get_client("apps") as v1_apps_client:
assert isinstance(v1_apps_client, AppsV1Api)
with kubernetes_credentials.get_client("batch") as v1_batch_client:
assert isinstance(v1_batch_client, BatchV1Api)
with kubernetes_credentials.get_client("core") as v1_core_client:
assert isinstance(v1_core_client, CoreV1Api)
```
Create a namespaced job:
Expand All @@ -49,8 +65,8 @@ class KubernetesCredentials(Block):
@flow
def kubernetes_orchestrator():
create_namespaced_job(
body=V1Job(**{"Marvin": "42"}),
kubernetes_credentials=kubernetes_credentials
kubernetes_credentials=kubernetes_credentials,
body=V1Job(**{"metadata": {"name": "my-job"}}),
)
```
"""
Expand All @@ -61,46 +77,40 @@ def kubernetes_orchestrator():
cluster_config: Optional[KubernetesClusterConfig] = None

@contextmanager
def get_app_client(self) -> Generator[AppsV1Api, None, None]:
def get_client(
self,
client_type: Literal["apps", "batch", "core"],
configuration: Optional[Configuration] = None,
) -> Generator[KubernetesClient, None, None]:
"""Convenience method for retrieving a Kubernetes API client for deployment resources.
Yields:
Kubernetes API client to interact with "deployment" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield AppsV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()

@contextmanager
def get_batch_client(self) -> Generator[BatchV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for job resources.
Args:
client_type: The resource-specific type of Kubernetes client to retrieve.
Yields:
Kubernetes API client to interact with "job" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield BatchV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()
An authenticated, resource-specific Kubernetes API client.
@contextmanager
def get_core_client(self) -> Generator[CoreV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for core resources.
Example:
```python
from prefect_kubernetes.credentials import KubernetesCredentials
Yields:
Kubernetes API client to interact with "pod", "service"
and "secret" resources.
with KubernetesCredentials.get_client("core") as core_v1_client:
for pod in core_v1_client.list_namespaced_pod():
print(pod.metadata.name)
```
"""
with self.get_generic_client() as generic_client:
client_config = configuration or Configuration()

with ApiClient(configuration=client_config) as generic_client:
try:
yield CoreV1Api(api_client=generic_client)
yield self.get_resource_specific_client(client_type)
finally:
generic_client.rest_client.pool_manager.clear()

def get_generic_client(self) -> ApiClient:
def get_resource_specific_client(
self,
client_type: str,
) -> Union[AppsV1Api, BatchV1Api, CoreV1Api]:
"""
Utility function for configuring a generic Kubernetes client.
It will attempt to connect to a Kubernetes cluster in three steps with
Expand All @@ -116,8 +126,15 @@ def get_generic_client(self) -> ApiClient:
3. Attempt out-of-cluster connection using the default location for a
kube config file.
Args:
client_type: The Kubernetes API client type for interacting with specific
Kubernetes resources.
Returns:
An authenticated, generic Kubernetes Client.
KubernetesClient: An authenticated, resource-specific Kubernetes Client.
Raises:
ValueError: If `client_type` is not a valid Kubernetes API client type.
"""

if self.cluster_config:
Expand All @@ -128,5 +145,10 @@ def get_generic_client(self) -> ApiClient:
except ConfigException:
kube_config.load_kube_config()

client = ApiClient()
return client
try:
return K8S_CLIENT_TYPES[client_type]()
except KeyError:
raise ValueError(
f"Invalid client type provided '{client_type}'."
f" Must be one of {listrepr(K8S_CLIENT_TYPES.keys())}."
)
Loading

0 comments on commit 7d32458

Please sign in to comment.