-
Notifications
You must be signed in to change notification settings - Fork 3
/
charm.py
executable file
·486 lines (424 loc) · 19.6 KB
/
charm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
"""A Juju Charm for Kubeflow Profiles Operator."""
import json
import logging
from pathlib import Path
from charmed_kubeflow_chisme.exceptions import ErrorWithStatus, GenericCharmRuntimeError
from charmed_kubeflow_chisme.kubernetes import KubernetesResourceHandler as KRH # noqa: N817
from charmed_kubeflow_chisme.lightkube.batch import delete_many
from charmed_kubeflow_chisme.pebble import update_layer
from charms.loki_k8s.v1.loki_push_api import LogForwarder
from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from lightkube import ApiError, LoadResourceError, codecs
from lightkube.generic_resource import create_global_resource, load_in_cluster_generic_resources
from lightkube.models.core_v1 import ServicePort
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.resources.core_v1 import Namespace, Secret
from ops.charm import ActionEvent, CharmBase
from ops.framework import StoredState
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, Container, MaintenanceStatus, WaitingStatus
from ops.pebble import ChangeError, Layer
from serialized_data_interface import NoCompatibleVersions, NoVersionsListed, get_interfaces
from tenacity import Retrying, stop_after_attempt, stop_after_delay, wait_exponential
K8S_RESOURCE_FILES = ["src/templates/auth_manifests.yaml.j2", "src/templates/crds.yaml.j2"]
NAMESPACE_LABELS_FILE = "src/templates/namespace-labels.yaml"
PROFILE_CONFIG_FILES = ["src/templates/allow-minio.yaml", "src/templates/allow-mlflow.yaml"]
class KubeflowProfilesOperator(CharmBase):
"""A Juju Charm for Kubeflow Profiles Operator."""
_stored = StoredState()
def __init__(self, *args):
"""Initialize charm and setup the container."""
super().__init__(*args)
self.log = logging.getLogger(__name__)
self._manager_port = self.model.config["manager-port"]
self._kfam_port = self.model.config["port"]
manager_port = ServicePort(int(self._manager_port), name="manager")
kfam_port = ServicePort(int(self._kfam_port), name="http")
self.service_patcher = KubernetesServicePatch(
self, [manager_port, kfam_port], service_name=f"{self.model.app.name}"
)
self._profiles_container_name = "kubeflow-profiles"
self._profiles_container = self.unit.get_container(self._profiles_container_name)
self._kfam_container_name = "kubeflow-kfam"
self._kfam_container = self.unit.get_container(self._kfam_container_name)
self._namespace = self.model.name
self._name = self.model.app.name
self._lightkube_field_manager = "lightkube"
self._k8s_resource_handler = None
# service account names are hardcoded
# TODO: implement relation and get from relation data
# tracked in https://github.com/canonical/kubeflow-profiles-operator/issues/156
self._istio_gateway_principal = (
"cluster.local/ns/kubeflow/sa/istio-ingressgateway-workload-service-account"
)
self._notebook_controller_principal = "cluster.local/ns/kubeflow/sa/jupyter-controller"
self._kfp_ui_principal = "cluster.local/ns/kubeflow/sa/kfp-ui"
# setup events to be handled by specific event handlers
self.framework.observe(self.on.install, self._on_install)
self.framework.observe(self.on.remove, self._on_remove)
self.framework.observe(self.on.config_changed, self.service_patcher._patch)
self.framework.observe(
self.on.kubeflow_profiles_pebble_ready, self._on_profiles_pebble_ready
)
self.framework.observe(self.on.kubeflow_kfam_pebble_ready, self._on_kfam_pebble_ready)
self.framework.observe(self.on.create_profile_action, self.on_create_profile_action)
self.framework.observe(
self.on.initialise_profile_action, self.on_initialise_profile_action
)
# setup events to be handled by main event handler
self.framework.observe(self.on.upgrade_charm, self._on_event)
self.framework.observe(self.on.config_changed, self._on_event)
for rel in self.model.relations.keys():
self.framework.observe(self.on[rel].relation_changed, self._on_event)
self._logging = LogForwarder(charm=self)
self.prometheus_provider = MetricsEndpointProvider(
self,
jobs=[{"static_configs": [{"targets": ["*:8080", "*:8081"]}]}],
refresh_event=[
self.on.kubeflow_profiles_pebble_ready,
self.on.kubeflow_kfam_pebble_ready,
],
)
@property
def profiles_container(self):
"""Return profiles container."""
return self._profiles_container
@property
def kfam_container(self):
"""Return kfam container."""
return self._kfam_container
@property
def _context(self):
"""Set up the context to be used for updating K8S resources."""
context = {"app_name": self.model.app.name, "model_name": self.model.name}
return context
@property
def _profiles_service_environment(self):
"""Return environment variables for kubeflow-profiles container."""
return {
"ISTIO_INGRESS_GATEWAY_PRINCIPAL": self._istio_gateway_principal, # noqa E501
"NOTEBOOK_CONTROLLER_PRINCIPAL": self._notebook_controller_principal,
"KFP_UI_PRINCIPAL": self._kfp_ui_principal,
}
@property
def _kfam_service_environment(self):
"""Return environment variables for kubeflow-kfam container."""
return {
"ISTIO_INGRESS_GATEWAY_PRINCIPAL": self._istio_gateway_principal, # noqa E501
"KFP_UI_PRINCIPAL": self._kfp_ui_principal,
}
@property
def k8s_resource_handler(self):
"""Update K8S with K8S resources."""
if not self._k8s_resource_handler:
self._k8s_resource_handler = KRH(
field_manager=self._lightkube_field_manager,
template_files=K8S_RESOURCE_FILES,
context=self._context,
logger=self.log,
)
load_in_cluster_generic_resources(self._k8s_resource_handler.lightkube_client)
return self._k8s_resource_handler
@k8s_resource_handler.setter
def k8s_resource_handler(self, handler: KRH):
"""Set K8S resource handler."""
self._k8s_resource_handler = handler
@property
def _profiles_pebble_layer(self) -> Layer:
"""Return the Profiles Pebble layer for the workload."""
return Layer(
{
"services": {
self._profiles_container_name: {
"override": "replace",
"summary": "entry point for kubeflow profiles",
"command": (
"/manager " "-userid-header " "kubeflow-userid " "-userid-prefix " '""'
),
"environment": self._profiles_service_environment,
"startup": "enabled",
}
},
"checks": {
"kubeflow-profiles-get": {
"override": "replace",
"period": "30s",
"http": {"url": "http://localhost:8080/metrics"},
}
},
}
)
@property
def _kfam_pebble_layer(self) -> Layer:
"""Return the kfam Pebble layer for the workload."""
return Layer(
{
"services": {
self._kfam_container_name: {
"override": "replace",
"summary": "entry point for kubeflow access management",
"command": (
"/access-management "
"-cluster-admin "
"admin "
"-userid-header "
"kubeflow-userid "
"-userid-prefix "
'""'
),
"environment": self._kfam_service_environment,
"startup": "enabled",
}
},
"checks": {
"kubeflow-kfam-get": {
"override": "replace",
"period": "30s",
"http": {"url": "http://localhost:8081/metrics"},
}
},
}
)
def _deploy_k8s_resources(self):
"""Deploy K8S resources."""
try:
self.unit.status = MaintenanceStatus("Creating K8S resources")
self.k8s_resource_handler.apply()
except ApiError as e:
raise GenericCharmRuntimeError("Failed to create K8S resources") from e
self.model.unit.status = MaintenanceStatus("K8S resources created")
def _check_container_connection(self, container: Container) -> None:
"""Check if connection can be made with container.
Args:
container: the named container in a unit to check.
Raises:
ErrorWithStatus if the connection cannot be made.
"""
if not container.can_connect():
raise ErrorWithStatus("Pod startup is not complete", MaintenanceStatus)
def _on_install(self, _):
"""Installation only tasks."""
# deploy K8S resources to speed up deployment
self._deploy_k8s_resources()
def _update_profiles_layer(self) -> None:
"""Update the Profile Pebble layer if changed.
Push the namespace labels file to the container
Add the Pebble layer and Replan
"""
if not self.profiles_container.can_connect():
raise ErrorWithStatus("Waiting for pod startup to complete", MaintenanceStatus)
current_layer = self.profiles_container.get_plan()
if current_layer.services != self._profiles_pebble_layer.services:
self._push_namespace_labels()
self.profiles_container.add_layer(
self._profiles_container_name, self._profiles_pebble_layer, combine=True
)
try:
self.log.info("Pebble plan updated with new configuration, replanning")
self.profiles_container.replan()
except ChangeError as e:
raise GenericCharmRuntimeError("Failed to replan") from e
def _on_profiles_pebble_ready(self, event):
"""Update the started Profiles container."""
# TODO: extract exception handling to _check_container_connection()
try:
self._check_container_connection(self.profiles_container)
except ErrorWithStatus as error:
self.model.unit = error.status
return
self._on_event(event)
def _push_namespace_labels(self):
"""Push namespace labels to Profile container."""
with open(NAMESPACE_LABELS_FILE, encoding="utf-8") as labels_file:
labels = labels_file.read()
self.profiles_container.push(
"/etc/profile-controller/namespace-labels.yaml", labels, make_dirs=True
)
def _on_kfam_pebble_ready(self, event):
"""Update the started kfam container."""
try:
self._check_container_connection(self.kfam_container)
except ErrorWithStatus as error:
self.model.unit = error.status
return
self._on_event(event)
def _on_remove(self, event):
"""Remove all resources."""
self.unit.status = MaintenanceStatus("Removing k8s resources")
manifests = self.k8s_resource_handler.render_manifests()
try:
delete_many(self.k8s_resource_handler.lightkube_client, manifests)
except ApiError as e:
self.log.warning(f"Failed to delete resources: {manifests} with: {e}")
raise e
self.unit.status = MaintenanceStatus("K8s resources removed")
def _send_info(self, interfaces):
"""Send Kubeflow Profiles interface info."""
if interfaces["kubeflow-profiles"]:
interfaces["kubeflow-profiles"].send_data(
{
"service-name": self.model.app.name,
"service-port": str(self.model.config["port"]),
}
)
def _check_leader(self):
"""Check if this unit is a leader."""
if not self.unit.is_leader():
# We can't do anything useful when not the leader, so do nothing.
self.log.info("Not a leader, skipping setup")
raise ErrorWithStatus("Waiting for leadership", WaitingStatus)
def _get_interfaces(self):
"""Retrieve interface object."""
try:
interfaces = get_interfaces(self)
except NoVersionsListed as err:
raise ErrorWithStatus(err, WaitingStatus)
except NoCompatibleVersions as err:
raise ErrorWithStatus(err, BlockedStatus)
return interfaces
def on_initialise_profile_action(self, event: ActionEvent) -> None:
"""Handle the action to initialise an existing profile."""
profile_name = event.params.get("profilename")
self.configure_profile(profile_name, event)
def on_create_profile_action(self, event: ActionEvent) -> None:
"""Handle the action to create a new profile."""
username = event.params.get("username")
profile_name = event.params.get("profilename")
resource_quota = event.params.get("resourcequota")
event.log(
f"Running action create-profile with parameters username={username}, profile_name={profile_name}, resource_quota={resource_quota}" # noqa E501
)
self.create_profile(username, profile_name, resource_quota, event)
self.configure_profile(profile_name, event)
def create_profile(self, username, profile_name, resource_quota, event: ActionEvent):
"""Create new profile object."""
formatted_quota = None
if resource_quota:
formatted_quota = self._load_text_to_dict(resource_quota) # preprocess resource quota
profile = create_global_resource(
group="kubeflow.org", version="v1", kind="Profile", plural="profiles"
)
# check if profile already exists
try:
existing_profile = self.k8s_resource_handler.lightkube_client.get(
profile, name=profile_name, namespace=self._namespace
)
if existing_profile:
event.fail(
f"Failed to create profile: profile with name {profile_name} already exists."
)
return
except ApiError:
event.log("Profile doesn't exist, action will proceed to create profile.")
my_profile = profile(
metadata={"name": profile_name},
spec={
"owner": {"kind": "User", "name": username},
"resourceQuotaSpec": formatted_quota,
},
)
self.k8s_resource_handler.lightkube_client.create(my_profile)
event.log(f"Profile {profile_name} created.")
def _load_text_to_dict(self, text):
return json.loads(text)
def configure_profile(self, profile_name, event: ActionEvent):
"""Add missing configurations to profile."""
create_global_resource(
group="kubeflow.org", version="v1", kind="Profile", plural="profiles"
)
# attempt to get namespace
try:
for attempt in Retrying(
stop=(stop_after_attempt(5) | stop_after_delay(30)),
wait=wait_exponential(multiplier=1, min=5, max=10),
reraise=True,
):
with attempt:
self.k8s_resource_handler.lightkube_client.get(Namespace, name=profile_name)
except ApiError:
event.fail(f"Action failed. Profile {profile_name} was not found.")
return
try:
for file in PROFILE_CONFIG_FILES:
# TODO figure out which integrations are needed
yaml_text = self._safe_load_file_to_text(file)
self._apply_manifest(yaml_text, event, profile_name)
except LoadResourceError as e:
event.fail(
f"Failed to apply PodDefaults: CRD not defined. To fix, deploy admission webhook. Error: {e}" # noqa E501
)
self._copy_seldon_secret(profile_name, event)
def _copy_seldon_secret(self, namespace, event: ActionEvent):
"""Copy Seldon deployment secret to the profile's namespace."""
seldon_secret = None
# check if seldon-core-mlflow integration secret exists in kubeflow namespace
try:
seldon_secret = self.k8s_resource_handler.lightkube_client.get(
Secret,
name="mlflow-server-seldon-init-container-s3-credentials",
namespace=self._namespace,
)
except ApiError as e:
event.log(f"seldon secret not found in kubeflow namespace. error:{e}")
if seldon_secret:
try:
self.k8s_resource_handler.lightkube_client.create(
Secret(
metadata=ObjectMeta(name="seldon-init-container-secret"),
kind="Secret",
apiVersion=seldon_secret.apiVersion,
data=seldon_secret.data,
type=seldon_secret.type,
),
namespace=namespace,
)
except ApiError as e:
event.log(
f"Failed to apply secret {seldon_secret.metadata.name} to namespace {namespace}. error:{e}" # noqa E501
)
def _apply_manifest(self, manifest, event: ActionEvent, namespace=None):
"""Apply manifest to namespace."""
for obj in codecs.load_all_yaml(manifest):
try:
self.k8s_resource_handler.lightkube_client.apply(obj, namespace=namespace)
except ApiError as e:
event.log(
f"Failed to apply manifest: {obj.metadata.name} to namespace: {namespace}. Error: {e}" # noqa E501
)
def _safe_load_file_to_text(self, filename: str):
"""Return the contents of filename if it is an existing file, else it returns filename."""
try:
text = Path(filename).read_text()
except FileNotFoundError:
text = filename
return text
def _on_event(self, event) -> None:
"""Perform all required actions for the Charm."""
try:
self._check_leader()
interfaces = self._get_interfaces()
self._send_info(interfaces)
self._deploy_k8s_resources()
self._update_profiles_layer()
update_layer(
self._kfam_container_name, self.kfam_container, self._kfam_pebble_layer, self.log
)
except ErrorWithStatus as err:
self.model.unit.status = err.status
self.log.info(f"Failed to handle {event} with error: {str(err)}")
return
self.model.unit.status = ActiveStatus()
class CheckFailed(Exception):
"""Raise this exception if one of the checks in main fails."""
def __init__(self, msg: str, status_type=None):
"""Initialize CheckFailed exception."""
super().__init__()
self.msg = str(msg)
self.status_type = status_type
self.status = status_type(self.msg)
if __name__ == "__main__":
main(KubeflowProfilesOperator)