Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #19 from PrefectHQ/job-tasks
Browse files Browse the repository at this point in the history
adds `job` tasks
  • Loading branch information
zzstoatzz authored Nov 9, 2022
2 parents 1029837 + 368ff4c commit 781ca60
Show file tree
Hide file tree
Showing 15 changed files with 684 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ dmypy.json
.vscode

# Jupyter notebook
*.ipynb
*.ipynb
10 changes: 2 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

### Added
- `KubernetesCredentials` block for generating authenticated Kubernetes clients - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)
- Tasks for interacting with `job` resources: `create_namespaced_job`, `delete_namespaced_job`, `list_namespaced_job`, `patch_namespaced_job`, `read_namespaced_job`, `replace_namespaced_job` - [#19](https://github.com/PrefectHQ/prefect-kubernetes/pull/19)

### Changed

Expand All @@ -18,11 +20,3 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

### Security

## 0.1.0

Released on ????? ?th, 20??.

### Added

- `task_name` task - [#1](https://github.com/PrefectHQ/prefect-kubernetes/pull/1)
18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@ pip install prefect-kubernetes
```

### Write and run a flow
#### List jobs in a specific namespace

```python
from prefect import flow
from prefect_kubernetes.tasks import (
goodbye_prefect_kubernetes,
hello_prefect_kubernetes,
)

from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job

@flow
def example_flow():
hello_prefect_kubernetes
goodbye_prefect_kubernetes

example_flow()
def kubernetes_orchestrator():
namespaced_job_list = list_namespaced_job(
namespace="my-namespace",
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
)
```

## Resources
Expand Down
1 change: 1 addition & 0 deletions docs/credentials.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_kubernetes.credentials
1 change: 1 addition & 0 deletions docs/exceptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_kubernetes.exceptions
1 change: 1 addition & 0 deletions docs/jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: prefect_kubernetes.jobs
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ plugins:
nav:
- Home: index.md
- Credentials: credentials.md
- Exceptions: exceptions.md
- Jobs: jobs.md
132 changes: 132 additions & 0 deletions prefect_kubernetes/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Module for defining Kubernetes credential handling and client generation."""

from contextlib import contextmanager
from typing import Generator, Optional

from kubernetes import config as kube_config
from kubernetes.client import ApiClient, AppsV1Api, BatchV1Api, CoreV1Api
from kubernetes.config.config_exception import ConfigException
from prefect.blocks.core import Block
from prefect.blocks.kubernetes import KubernetesClusterConfig


class KubernetesCredentials(Block):
"""Credentials block for generating configured Kubernetes API clients.
Attributes:
cluster_config: A `KubernetesClusterConfig` block holding a JSON kube
config for a specific kubernetes context.
Examples:
Load stored Kubernetes credentials:
```python
from prefect_kubernetes.credentials import KubernetesCredentials
kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials")
```
Create resource-specific API clients from KubernetesCredentials:
```python
from prefect_kubernetes import KubernetesCredentials
kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials")
kubernetes_app_v1_client = kubernetes_credentials.get_app_client()
kubernetes_batch_v1_client = kubernetes_credentials.get_batch_client()
kubernetes_core_v1_client = kubernetes_credentials.get_core_client()
```
Create a namespaced job:
```python
from prefect import flow
from prefect_kubernetes import KubernetesCredentials
from prefect_kubernetes.job import create_namespaced_job
from kubernetes.client.models import V1Job
kubernetes_credentials = KubernetesCredentials.load("my-k8s-credentials")
@flow
def kubernetes_orchestrator():
create_namespaced_job(
body=V1Job(**{"Marvin": "42"}),
kubernetes_credentials=kubernetes_credentials
)
```
"""

_block_type_name = "Kubernetes Credentials"
_logo_url = "https://images.ctfassets.net/zscdif0zqppk/oYuHjIbc26oilfQSEMjRv/a61f5f6ef406eead2df5231835b4c4c2/logo.png?h=250" # noqa

cluster_config: Optional[KubernetesClusterConfig] = None

@contextmanager
def get_app_client(self) -> Generator[AppsV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for deployment resources.
Yields:
Kubernetes API client to interact with "deployment" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield AppsV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()

@contextmanager
def get_batch_client(self) -> Generator[BatchV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for job resources.
Yields:
Kubernetes API client to interact with "job" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield BatchV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()

@contextmanager
def get_core_client(self) -> Generator[CoreV1Api, None, None]:
"""Convenience method for retrieving a Kubernetes API client for core resources.
Yields:
Kubernetes API client to interact with "pod", "service"
and "secret" resources.
"""
with self.get_generic_client() as generic_client:
try:
yield CoreV1Api(api_client=generic_client)
finally:
generic_client.rest_client.pool_manager.clear()

def get_generic_client(self) -> ApiClient:
"""
Utility function for configuring a generic Kubernetes client.
It will attempt to connect to a Kubernetes cluster in three steps with
the first successful connection attempt becoming the mode of communication with
a cluster:
1. It will first attempt to use a `KubernetesCredentials` block's
`cluster_config` to configure a client using
`KubernetesClusterConfig.configure_client`.
2. Attempt in-cluster connection (will only work when running on a pod).
3. Attempt out-of-cluster connection using the default location for a
kube config file.
Returns:
An authenticated, generic Kubernetes Client.
"""

if self.cluster_config:
self.cluster_config.configure_client()
else:
try:
kube_config.load_incluster_config()
except ConfigException:
kube_config.load_kube_config()

client = ApiClient()
return client
15 changes: 15 additions & 0 deletions prefect_kubernetes/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Module to define common exceptions within `prefect_kubernetes`."""

from kubernetes.client.exceptions import ApiException, OpenApiException


class KubernetesJobDefinitionError(OpenApiException):
"""An exception for when a Kubernetes job definition is invalid."""


class KubernetesJobFailedError(OpenApiException):
"""An exception for when a Kubernetes job fails."""


class KubernetesResourceNotFoundError(ApiException):
"""An exception for when a Kubernetes resource cannot be found by a client."""
Loading

0 comments on commit 781ca60

Please sign in to comment.