generated from canonical/template-operator
-
Notifications
You must be signed in to change notification settings - Fork 27
/
charm.py
executable file
·77 lines (61 loc) · 2.54 KB
/
charm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
from pathlib import Path
from charms.observability_libs.v1.kubernetes_service_patch import (
KubernetesServicePatch,
ServicePort,
)
from charms.traefik_k8s.v1.ingress_per_unit import IngressPerUnitRequirer
from ops.charm import CharmBase, PebbleReadyEvent
from ops.model import ActiveStatus, Container, WaitingStatus
from ops.pebble import Layer
class TCPRequirerMock(CharmBase):
_tcp_port = 9999 # port is hard-coded in workload.py
def __init__(self, framework):
super().__init__(framework)
self.service_patch = KubernetesServicePatch(
charm=self,
ports=[ServicePort(self._tcp_port, name=f"{self.app.name}")],
)
self.unit.status = ActiveStatus("ready")
# dummy charm: only create the relation AFTER pebble ready has fired.
self.framework.observe(self.on.ingress_per_unit_relation_created, self._ipu_created)
self.framework.observe(self.on.tcp_server_pebble_ready, self._pebble_ready)
def _pebble_ready(self, event: PebbleReadyEvent):
container: Container = event.workload
if not container.can_connect():
event.defer()
self.unit.status = WaitingStatus(
"Pending webserver restart; waiting for workload container"
)
return
workload_file = Path(__file__).parent / "workload.py"
with open(workload_file, "r") as workload_source:
print("pushing webserver source...")
container.push("/workload.py", workload_source)
new_layer = Layer(
{
"summary": "webserver layer",
"description": "pebble config layer for workload",
"services": {
"workload": {
"override": "replace",
"summary": "workload",
"command": "python workload.py",
"startup": "enabled",
}
},
}
)
container.add_layer("webserver", new_layer, combine=True)
print("Added updated layer 'webserver' to Pebble plan")
container.replan()
print("restarted webserver service")
self.unit.status = ActiveStatus()
def _ipu_created(self, _event):
ipu = IngressPerUnitRequirer(self, mode="tcp")
ipu.provide_ingress_requirements(port=self._tcp_port)
if __name__ == "__main__":
from ops.main import main
main(TCPRequirerMock)