-
Notifications
You must be signed in to change notification settings - Fork 0
/
k8s_service_info.py
385 lines (293 loc) · 13.3 KB
/
k8s_service_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
"""Library for sharing Kubernetes Services information.
This library offers a Python API for providing and requesting information about
any Kubernetes Service resource.
The default relation name is `k8s-svc-info` and it's recommended to use that name,
though if changed, you must ensure to pass the correct name when instantiating the
provider and requirer classes, as well as in `metadata.yaml`.
## Getting Started
### Fetching the library with charmcraft
Using charmcraft you can:
```shell
charmcraft fetch-lib charms.mlops_libs.v0.k8s_service_info
```
## Using the library as requirer
### Add relation to metadata.yaml
```yaml
requires:
k8s-svc-info:
interface: k8s-service
limit: 1
```
### Instantiate the KubernetesServiceInfoRequirer class in charm.py
```python
from ops.charm import CharmBase
from charms.mlops_libs.v0.kubernetes_service_info import KubernetesServiceInfoRequirer, KubernetesServiceInfoRelationError
class RequirerCharm(CharmBase):
def __init__(self, *args):
self._k8s_svc_info_requirer = KubernetesServiceInfoRequirer(self)
self.framework.observe(self.on.some_event_emitted, self.some_event_function)
def some_event_function():
# use the getter function wherever the info is needed
try:
k8s_svc_info_data = self._k8s_svc_info_requirer.get_data()
except KubernetesServiceInfoRelationError as error:
"your error handler goes here"
```
## Using the library as provider
### Add relation to metadata.yaml
```yaml
provides:
k8s-svc-info:
interface: k8s-service
```
### Instantiate the KubernetesServiceInfoProvider class in charm.py
```python
from ops.charm import CharmBase
from charms.mlops_libs.v0.kubernetes_service_info import KubernetesServiceInfoProvider, KubernetesServiceInfoRelationError
class ProviderCharm(CharmBase):
def __init__(self, *args, **kwargs):
...
self._k8s_svc_info_provider = KubernetesServiceInfoProvider(self)
self.observe(self.on.some_event, self._some_event_handler)
def _some_event_handler(self, ...):
# This will update the relation data bag with the Service name and port
try:
self._k8s_svc_info_provider.send_data(name, port)
except KubernetesServiceInfoRelationError as error:
"your error handler goes here"
```
## Relation data
The data shared by this library is:
* name: the name of the Kubernetes Service
as it appears in the resource metadata, e.g. "metadata-grpc-service".
* port: the port of the Kubernetes Service
"""
import logging
from typing import List, Optional, Union
from ops.charm import CharmBase, RelationEvent
from ops.framework import BoundEvent, EventSource, Object, ObjectEvents
from ops.model import Relation
from pydantic import BaseModel
# The unique Charmhub library identifier, never change it
LIBID = "f5c3f6cc023e40468d6f9a871e8afcd0"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
# Default relation and interface names. If changed, consistency must be kept
# across the provider and requirer.
DEFAULT_RELATION_NAME = "k8s-service-info"
DEFAULT_INTERFACE_NAME = "k8s-service"
REQUIRED_ATTRIBUTES = ["name", "port"]
logger = logging.getLogger(__name__)
class KubernetesServiceInfoRelationError(Exception):
"""Base exception class for any relation error handled by this library."""
pass
class KubernetesServiceInfoRelationMissingError(KubernetesServiceInfoRelationError):
"""Exception to raise when the relation is missing on either end."""
def __init__(self):
self.message = "Missing relation with a k8s service info provider."
super().__init__(self.message)
class KubernetesServiceInfoRelationDataMissingError(KubernetesServiceInfoRelationError):
"""Exception to raise when there is missing data in the relation data bag."""
def __init__(self, message):
self.message = message
super().__init__(self.message)
class KubernetesServiceInfoUpdatedEvent(RelationEvent):
"""Indicates the Kubernetes Service Info data was updated."""
class KubernetesServiceInfoEvents(ObjectEvents):
"""Events for the Kubernetes Service Info library."""
updated = EventSource(KubernetesServiceInfoUpdatedEvent)
class KubernetesServiceInfoObject(BaseModel):
"""Representation of a Kubernetes Service info object.
Args:
name: The name of the Service
port: The port of the Service
"""
name: str
port: str
class KubernetesServiceInfoRequirer(Object):
"""Implement the Requirer end of the Kubernetes Service Info relation.
Observes the relation events and get data of a related application.
This library emits:
* KubernetesServiceInfoUpdatedEvent: when data received on the relation is updated.
Args:
charm (CharmBase): the provider application
refresh_event: (list, optional): list of BoundEvents that this manager should handle.
Use this to update the data sent on this relation on demand.
relation_name (str, optional): the name of the relation
Attributes:
charm (CharmBase): variable for storing the requirer application
relation_name (str): variable for storing the name of the relation
"""
on = KubernetesServiceInfoEvents()
def __init__(
self,
charm: CharmBase,
refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
relation_name: Optional[str] = DEFAULT_RELATION_NAME,
):
super().__init__(charm, relation_name)
self._charm = charm
self._relation_name = relation_name
self._requirer_wrapper = KubernetesServiceInfoRequirerWrapper(
self._charm, self._relation_name
)
self.framework.observe(
self._charm.on[self._relation_name].relation_changed, self._on_relation_changed
)
self.framework.observe(
self._charm.on[self._relation_name].relation_broken, self._on_relation_broken
)
if refresh_event:
if not isinstance(refresh_event, (tuple, list)):
refresh_event = [refresh_event]
for evt in refresh_event:
self.framework.observe(evt, self._on_relation_changed)
def get_data(self) -> KubernetesServiceInfoObject:
"""Return a KubernetesServiceInfoObject."""
return self._requirer_wrapper.get_data()
def _on_relation_changed(self, event: BoundEvent) -> None:
"""Handle relation-changed event for this relation."""
self.on.updated.emit(event.relation)
def _on_relation_broken(self, event: BoundEvent) -> None:
"""Handle relation-broken event for this relation."""
self.on.updated.emit(event.relation)
class KubernetesServiceInfoRequirerWrapper(Object):
"""Wrapper for the relation data getting logic.
Args:
charm (CharmBase): the requirer application
relation_name (str, optional): the name of the relation
Attributes:
relation_name (str): variable for storing the name of the relation
"""
def __init__(self, charm, relation_name: Optional[str] = DEFAULT_RELATION_NAME):
super().__init__(charm, relation_name)
self.relation_name = relation_name
@staticmethod
def _validate_relation(relation: Relation) -> None:
"""Series of checks for the relation and relation data.
Args:
relation (Relation): the relation object to run the checks on
Raises:
KubernetesServiceInfoRelationDataMissingError if data is missing or incomplete
KubernetesServiceInfoRelationMissingError: if there is no related application
"""
# Raise if there is no related application
if not relation:
raise KubernetesServiceInfoRelationMissingError()
# Extract remote app information from relation
remote_app = relation.app
# Get relation data from remote app
relation_data = relation.data[remote_app]
# Raise if there is no data found in the relation data bag
if not relation_data:
raise KubernetesServiceInfoRelationDataMissingError(
f"No data found in relation {relation.name} data bag."
)
# Check if the relation data contains the expected attributes
missing_attributes = [
attribute for attribute in REQUIRED_ATTRIBUTES if attribute not in relation_data
]
if missing_attributes:
raise KubernetesServiceInfoRelationDataMissingError(
f"Missing attributes: {missing_attributes} in relation {relation.name}"
)
def get_data(self) -> KubernetesServiceInfoObject:
"""Return a KubernetesServiceInfoObject containing Kubernetes Service information.
Raises:
KubernetesServiceInfoRelationDataMissingError: if data is missing entirely or some attributes
KubernetesServiceInfoRelationMissingError: if there is no related application
ops.model.TooManyRelatedAppsError: if there is more than one related application
"""
# Validate relation data
# Raises TooManyRelatedAppsError if related to more than one app
relation = self.model.get_relation(self.relation_name)
self._validate_relation(relation=relation)
# Get relation data from remote app
relation_data = relation.data[relation.app]
return KubernetesServiceInfoObject(name=relation_data["name"], port=relation_data["port"])
class KubernetesServiceInfoProvider(Object):
"""Implement the Provider end of the Kubernetes Service Info relation.
Observes relation events to send data to related applications.
Args:
charm (CharmBase): the provider application
name (str): the name of the Kubernetes Service the provider knows about
port (str): the port number of the Kubernetes Service the provider knows about
refresh_event: (list, optional): list of BoundEvents that this manager should handle. Use this to update
the data sent on this relation on demand.
relation_name (str, optional): the name of the relation
Attributes:
charm (CharmBase): variable for storing the provider application
relation_name (str): variable for storing the name of the relation
"""
def __init__(
self,
charm: CharmBase,
name: str,
port: str,
refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None,
relation_name: Optional[str] = DEFAULT_RELATION_NAME,
):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self._provider_wrapper = KubernetesServiceInfoProviderWrapper(
self.charm, self.relation_name
)
self._svc_name = name
self._svc_port = port
self.framework.observe(self.charm.on.leader_elected, self._send_data)
self.framework.observe(self.charm.on[self.relation_name].relation_created, self._send_data)
if refresh_event:
if not isinstance(refresh_event, (tuple, list)):
refresh_event = [refresh_event]
for evt in refresh_event:
self.framework.observe(evt, self._send_data)
def _send_data(self, _) -> None:
"""Serve as an event handler for sending the Kubernetes Service information."""
self._provider_wrapper.send_data(self._svc_name, self._svc_port)
class KubernetesServiceInfoProviderWrapper(Object):
"""Wrapper for the relation data sending logic.
Args:
charm (CharmBase): the provider application
relation_name (str, optional): the name of the relation
Attributes:
charm (CharmBase): variable for storing the provider application
relation_name (str): variable for storing the name of the relation
"""
def __init__(self, charm: CharmBase, relation_name: Optional[str] = DEFAULT_RELATION_NAME):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
def send_data(
self,
name: str,
port: str,
) -> None:
"""Update the relation data bag with data from a Kubernetes Service.
This method will complete successfully even if there are no related applications.
Args:
name (str): the name of the Kubernetes Service the provider knows about
port (str): the port number of the Kubernetes Service the provider knows about
"""
# Validate unit is leader to send data; otherwise return
if not self.charm.model.unit.is_leader():
logger.info(
"KubernetesServiceInfoProvider handled send_data event when it is not the leader."
"Skipping event - no data sent."
)
# Update the relation data bag with a Kubernetes Service information
relations = self.charm.model.relations[self.relation_name]
# Update relation data
for relation in relations:
relation.data[self.charm.app].update(
{
"name": name,
"port": port,
}
)