Skip to content

Commit

Permalink
Graceful teardown (#103)
Browse files Browse the repository at this point in the history
* updated restart logic and added stop if not ready

* fixed utest

* fmt and layer fix

* added layer replace

* vbump

* simplified layer-stop mechanism
  • Loading branch information
PietroPasotti authored Nov 21, 2024
1 parent 303d1b7 commit c5abf13
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "cosl"
version = "0.0.43"
version = "0.0.44"
authors = [
{ name="sed-i", email="[email protected]" },
]
Expand Down
162 changes: 107 additions & 55 deletions src/cosl/coordinated_workers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,58 +408,24 @@ def roles(self) -> List[str]:
]
return active_roles

def _update_config(self) -> None:
def _update_config(self) -> bool:
"""Update the worker config and restart the workload if necessary."""
if not self._container.can_connect():
logger.debug("container cannot connect, skipping update_config.")
return

restart = any(
return any(
(
self._update_tls_certificates(),
self._update_worker_config(),
self._set_pebble_layer(),
)
)

# we restart in 2 situations:
# - we need to because our config has changed
# - some services are not running
success = True
if restart:
logger.debug("Config changed. Restarting worker services...")
success = self.restart()
# this can happen if s3 wasn't ready (server gave error) when we processed an earlier event
# causing the worker service to die on startup (exited quickly with code...)
# so we try to restart it now.
# TODO: would be nice if we could be notified of when s3 starts working, so we don't have to
# wait for an update-status and can listen to that instead.
else:
services_not_up = [
svc.name for svc in self._container.get_services().values() if not svc.is_running()
]
if services_not_up:
logger.debug(
f"Not all services are running: {services_not_up}. Restarting worker services..."
)
success = self.restart()

if not success:
# this means that we have managed to start the process without pebble errors,
# but somehow the status is still not "up" after 15m
# we are going to set blocked status, but we can also log it here
logger.warning("failed to (re)start the worker services")

def _set_pebble_layer(self) -> bool:
"""Set Pebble layer.
Assumes that the caller has verified that the worker is ready, i.e.
that we have a container and a cluster configuration.
Returns: True if Pebble layer was added, otherwise False.
"""
if not self._container.can_connect():
return False
if not self.roles:
return False

current_plan = self._container.get_plan()
if not (layer := self.pebble_layer):
return False
Expand Down Expand Up @@ -505,8 +471,84 @@ def _reconcile(self):
if self.resources_patch and not self.resources_patch.is_ready():
logger.debug("Resource patch not ready yet. Skipping reconciliation step.")
return

self._update_cluster_relation()
self._update_config()

if self.is_ready():
logger.debug("Worker ready. Updating config...")

# we restart in 2 situations:
# - we need to because our config has changed
# - some services are not running
configs_changed = self._update_config()
success = None
if configs_changed:
logger.debug("Config changed. Restarting worker services...")
success = self.restart()

elif services_down := self._get_services_down():
logger.debug(f"Some services are down: {services_down}. Restarting worker...")
success = self.restart()

if success is False:
# this means that we have managed to start the process without pebble errors,
# but somehow the status is still not "up" after 15m
# we are going to set blocked status, but we can also log it here
logger.warning("failed to (re)start the worker services")

else:
logger.debug("Worker not ready. Tearing down...")

if self._container.can_connect():
logger.debug("Wiping configs and stopping workload...")
self._wipe_configs()
self.stop()

else:
logger.debug("Container offline: nothing to teardown.")

def _get_services_down(self) -> List[str]:
# this can happen if s3 wasn't ready (server gave error) when we processed an earlier event
# causing the worker service to die on startup (exited quickly with code...)
# so we try to restart it now.
# TODO: would be nice if we could be notified of when s3 starts working, so we don't have to
# wait for an update-status and can listen to that instead.
return [
svc.name for svc in self._container.get_services().values() if not svc.is_running()
]

def _wipe_configs(self):
"""Delete all configuration files on disk, purely for hygiene."""
for config_file in (
KEY_FILE,
CLIENT_CA_FILE,
CERT_FILE,
S3_TLS_CA_CHAIN_FILE,
ROOT_CA_CERT,
CONFIG_FILE,
):
self._container.remove_path(config_file, recursive=True)

logger.debug("wiped all configs")

def stop(self):
"""Stop the workload and tell pebble to not restart it.
Assumes that pebble can connect.
"""
# we might be unable to retrieve self.pebble_layer if something goes wrong generating it
# for example because we're being torn down and the environment is being weird
services = tuple(
self.pebble_layer.services
if self.pebble_layer
else self._container.get_plan().services
)

if not services:
logger.warning("nothing to stop: no services found in layer or plan")
return

self._container.stop(*services)

def _update_cluster_relation(self) -> None:
"""Publish all the worker information to relation data."""
Expand Down Expand Up @@ -545,26 +587,36 @@ def _running_worker_config(self) -> Optional[Dict[str, Any]]:
)
return None

def is_ready(self) -> bool:
"""Check whether the worker has all data it needs to operate."""
if not self._container.can_connect():
logger.warning("worker not ready: container cannot connect.")
return False

elif len(self.roles) == 0:
logger.warning("worker not ready: role missing or misconfigured.")
return False

elif not self._worker_config:
logger.warning("worker not ready: coordinator hasn't published a config")
return False

else:
return True

def _update_worker_config(self) -> bool:
"""Set worker config for the workload.
Assumes that the caller has verified that the worker is ready, i.e.
that we have a container and a cluster configuration.
Returns: True if config has changed, otherwise False.
Raises: BlockedStatusError exception if PebbleError, ProtocolError, PathError exceptions
are raised by container.remove_path
"""
if not self._container.can_connect():
logger.warning("cannot update worker config: container cannot connect.")
return False

if len(self.roles) == 0:
logger.warning("cannot update worker config: role missing or misconfigured.")
return False

# fetch the config from the coordinator
worker_config = self._worker_config
if not worker_config:
logger.warning("cannot update worker config: coordinator hasn't published one yet.")
return False

# and compare it against the one on disk (if any)
if self._running_worker_config() != worker_config:
config_as_yaml = yaml.safe_dump(worker_config)
self._container.push(CONFIG_FILE, config_as_yaml, make_dirs=True)
Expand Down Expand Up @@ -621,11 +673,11 @@ def _sync_tls_files(self, tls_data: TLSData):
def _update_tls_certificates(self) -> bool:
"""Update the TLS certificates on disk according to their availability.
Assumes that the caller has verified that the worker is ready, i.e.
that we have a container and a cluster configuration.
Return True if we need to restart the workload after this update.
"""
if not self._container.can_connect():
return False

tls_data = self.cluster.get_tls_data(allow_none=True)
if not tls_data:
return False
Expand Down
122 changes: 122 additions & 0 deletions tests/test_coordinated_workers/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def test_roles_from_config(roles_active, roles_inactive, expected):
assert set(mgr.charm.worker.roles) == set(expected)


@patch.object(Worker, "is_ready", new=lambda _: True)
def test_worker_restarts_if_some_service_not_up(tmp_path):
# GIVEN a worker with some services
MyCharm.layer = ops.pebble.Layer(
Expand Down Expand Up @@ -166,6 +167,7 @@ def test_worker_restarts_if_some_service_not_up(tmp_path):
]


@patch.object(Worker, "is_ready", new=lambda _: True)
def test_worker_does_not_restart_external_services(tmp_path):
# GIVEN a worker with some services and a layer with some other services
MyCharm.layer = ops.pebble.Layer(
Expand Down Expand Up @@ -652,3 +654,123 @@ def test_worker_certs_update_only_s3(restart_mock, tmp_path, s3_ca_on_disk):

# AND the worker restarts the workload IF it was not on disk already
assert restart_mock.call_count == (0 if s3_ca_on_disk else 1)


@patch.object(Worker, "restart")
@patch.object(Worker, "stop")
@pytest.mark.parametrize("tls", (True, False))
def test_stop_called_on_no_cluster(stop_mock, restart_mock, tmp_path, tls):
# GIVEN a worker who's all happy to begin with
ctx = testing.Context(
MyCharm,
meta={
"name": "foo",
"requires": {"cluster": {"interface": "cluster"}},
"containers": {"foo": {"type": "oci-image"}},
},
config={"options": {"role-all": {"type": "boolean", "default": True}}},
)
cert = tmp_path / "cert.cert"
key = tmp_path / "key.key"
client_ca = tmp_path / "client_ca.cert"
s3_ca_chain = tmp_path / "s3_ca_chain.cert"

if tls:
s3_ca_chain.write_text("something_tls")
cert.write_text("something_tls")
key.write_text("something_tls")
client_ca.write_text("something_tls")

container = testing.Container(
"foo",
can_connect=True,
execs={testing.Exec(("update-ca-certificates", "--fresh"))},
mounts={
"cert": testing.Mount(location=CERT_FILE, source=cert),
"key": testing.Mount(location=KEY_FILE, source=key),
"client_ca": testing.Mount(location=CLIENT_CA_FILE, source=client_ca),
"s3_ca_chain": testing.Mount(location=S3_TLS_CA_CHAIN_FILE, source=s3_ca_chain),
},
)

# WHEN the charm receives any event
ctx.run(
ctx.on.update_status(),
testing.State(leader=True, containers={container}),
)

fs = container.get_filesystem(ctx)
# THEN the worker wipes all certificates if they are there
assert not fs.joinpath(CERT_FILE).exists()
assert not fs.joinpath(KEY_FILE).exists()
assert not fs.joinpath(CLIENT_CA_FILE).exists()
assert not fs.joinpath(S3_TLS_CA_CHAIN_FILE).exists()

# AND the worker stops the workload instead of restarting it
assert not restart_mock.called
assert stop_mock.called


@patch.object(Worker, "is_ready", new=lambda _: False)
def test_worker_stop_all_services_if_not_ready(tmp_path):
# GIVEN a worker with some services
MyCharm.layer = ops.pebble.Layer(
{
"services": {
"foo": {
"summary": "foos all the things",
"description": "bar",
"startup": "enabled",
"override": "merge",
"command": "ls -la",
},
"bar": {
"summary": "bars the foos",
"description": "bar",
"startup": "enabled",
"command": "exit 1",
},
"baz": {
"summary": "bazzes all of the bars",
"description": "bar",
"startup": "enabled",
"command": "echo hi",
},
}
}
)
ctx = testing.Context(
MyCharm,
meta={
"name": "foo",
"requires": {"cluster": {"interface": "cluster"}},
"containers": {"foo": {"type": "oci-image"}},
},
config={"options": {"role-all": {"type": "boolean", "default": True}}},
)
# WHEN the charm receives any event, but it is not ready
cfg = tmp_path / "cfg.yaml"
cfg.write_text("some: yaml")
container = testing.Container(
"foo",
layers={"base": MyCharm.layer},
can_connect=True,
mounts={"local": testing.Mount(location=CONFIG_FILE, source=cfg)},
execs={
testing.Exec(("update-ca-certificates", "--fresh")),
testing.Exec(("/bin/foo", "-version"), stdout="foo"),
},
service_statuses={
"foo": ops.pebble.ServiceStatus.ACTIVE,
"bar": ops.pebble.ServiceStatus.ACTIVE,
"baz": ops.pebble.ServiceStatus.INACTIVE,
},
)
state_out = ctx.run(ctx.on.pebble_ready(container), testing.State(containers={container}))

# THEN the charm restarts all the services that are down
container_out = state_out.get_container("foo")
service_statuses = container_out.service_statuses.values()
assert all(svc is ops.pebble.ServiceStatus.INACTIVE for svc in service_statuses), [
stat.value for stat in service_statuses
]
2 changes: 0 additions & 2 deletions tests/test_coordinated_workers/test_worker_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ def test_status_check_no_pebble(ctx, base_state, caplog):

# THEN the charm sets blocked
assert state_out.unit_status == ops.WaitingStatus("Waiting for `workload` container")
# AND THEN the charm logs that the container isn't ready.
assert "container cannot connect, skipping update_config." in caplog.messages


@k8s_patch()
Expand Down

0 comments on commit c5abf13

Please sign in to comment.