From c5abf1390d677d06df80fc87fbbd60de79995cbe Mon Sep 17 00:00:00 2001 From: PietroPasotti Date: Thu, 21 Nov 2024 10:45:58 +0100 Subject: [PATCH] Graceful teardown (#103) * updated restart logic and added stop if not ready * fixed utest * fmt and layer fix * added layer replace * vbump * simplified layer-stop mechanism --- pyproject.toml | 2 +- src/cosl/coordinated_workers/worker.py | 162 ++++++++++++------ tests/test_coordinated_workers/test_worker.py | 122 +++++++++++++ .../test_worker_status.py | 2 - 4 files changed, 230 insertions(+), 58 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c4ebfcc..07f6ae6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "cosl" -version = "0.0.43" +version = "0.0.44" authors = [ { name="sed-i", email="82407168+sed-i@users.noreply.github.com" }, ] diff --git a/src/cosl/coordinated_workers/worker.py b/src/cosl/coordinated_workers/worker.py index 4674675..571889a 100644 --- a/src/cosl/coordinated_workers/worker.py +++ b/src/cosl/coordinated_workers/worker.py @@ -408,13 +408,9 @@ def roles(self) -> List[str]: ] return active_roles - def _update_config(self) -> None: + def _update_config(self) -> bool: """Update the worker config and restart the workload if necessary.""" - if not self._container.can_connect(): - logger.debug("container cannot connect, skipping update_config.") - return - - restart = any( + return any( ( self._update_tls_certificates(), self._update_worker_config(), @@ -422,44 +418,14 @@ def _update_config(self) -> None: ) ) - # we restart in 2 situations: - # - we need to because our config has changed - # - some services are not running - success = True - if restart: - logger.debug("Config changed. Restarting worker services...") - success = self.restart() - # this can happen if s3 wasn't ready (server gave error) when we processed an earlier event - # causing the worker service to die on startup (exited quickly with code...) - # so we try to restart it now. - # TODO: would be nice if we could be notified of when s3 starts working, so we don't have to - # wait for an update-status and can listen to that instead. - else: - services_not_up = [ - svc.name for svc in self._container.get_services().values() if not svc.is_running() - ] - if services_not_up: - logger.debug( - f"Not all services are running: {services_not_up}. Restarting worker services..." - ) - success = self.restart() - - if not success: - # this means that we have managed to start the process without pebble errors, - # but somehow the status is still not "up" after 15m - # we are going to set blocked status, but we can also log it here - logger.warning("failed to (re)start the worker services") - def _set_pebble_layer(self) -> bool: """Set Pebble layer. + Assumes that the caller has verified that the worker is ready, i.e. + that we have a container and a cluster configuration. + Returns: True if Pebble layer was added, otherwise False. """ - if not self._container.can_connect(): - return False - if not self.roles: - return False - current_plan = self._container.get_plan() if not (layer := self.pebble_layer): return False @@ -505,8 +471,84 @@ def _reconcile(self): if self.resources_patch and not self.resources_patch.is_ready(): logger.debug("Resource patch not ready yet. Skipping reconciliation step.") return + self._update_cluster_relation() - self._update_config() + + if self.is_ready(): + logger.debug("Worker ready. Updating config...") + + # we restart in 2 situations: + # - we need to because our config has changed + # - some services are not running + configs_changed = self._update_config() + success = None + if configs_changed: + logger.debug("Config changed. Restarting worker services...") + success = self.restart() + + elif services_down := self._get_services_down(): + logger.debug(f"Some services are down: {services_down}. Restarting worker...") + success = self.restart() + + if success is False: + # this means that we have managed to start the process without pebble errors, + # but somehow the status is still not "up" after 15m + # we are going to set blocked status, but we can also log it here + logger.warning("failed to (re)start the worker services") + + else: + logger.debug("Worker not ready. Tearing down...") + + if self._container.can_connect(): + logger.debug("Wiping configs and stopping workload...") + self._wipe_configs() + self.stop() + + else: + logger.debug("Container offline: nothing to teardown.") + + def _get_services_down(self) -> List[str]: + # this can happen if s3 wasn't ready (server gave error) when we processed an earlier event + # causing the worker service to die on startup (exited quickly with code...) + # so we try to restart it now. + # TODO: would be nice if we could be notified of when s3 starts working, so we don't have to + # wait for an update-status and can listen to that instead. + return [ + svc.name for svc in self._container.get_services().values() if not svc.is_running() + ] + + def _wipe_configs(self): + """Delete all configuration files on disk, purely for hygiene.""" + for config_file in ( + KEY_FILE, + CLIENT_CA_FILE, + CERT_FILE, + S3_TLS_CA_CHAIN_FILE, + ROOT_CA_CERT, + CONFIG_FILE, + ): + self._container.remove_path(config_file, recursive=True) + + logger.debug("wiped all configs") + + def stop(self): + """Stop the workload and tell pebble to not restart it. + + Assumes that pebble can connect. + """ + # we might be unable to retrieve self.pebble_layer if something goes wrong generating it + # for example because we're being torn down and the environment is being weird + services = tuple( + self.pebble_layer.services + if self.pebble_layer + else self._container.get_plan().services + ) + + if not services: + logger.warning("nothing to stop: no services found in layer or plan") + return + + self._container.stop(*services) def _update_cluster_relation(self) -> None: """Publish all the worker information to relation data.""" @@ -545,26 +587,36 @@ def _running_worker_config(self) -> Optional[Dict[str, Any]]: ) return None + def is_ready(self) -> bool: + """Check whether the worker has all data it needs to operate.""" + if not self._container.can_connect(): + logger.warning("worker not ready: container cannot connect.") + return False + + elif len(self.roles) == 0: + logger.warning("worker not ready: role missing or misconfigured.") + return False + + elif not self._worker_config: + logger.warning("worker not ready: coordinator hasn't published a config") + return False + + else: + return True + def _update_worker_config(self) -> bool: """Set worker config for the workload. + Assumes that the caller has verified that the worker is ready, i.e. + that we have a container and a cluster configuration. + Returns: True if config has changed, otherwise False. Raises: BlockedStatusError exception if PebbleError, ProtocolError, PathError exceptions are raised by container.remove_path """ - if not self._container.can_connect(): - logger.warning("cannot update worker config: container cannot connect.") - return False - - if len(self.roles) == 0: - logger.warning("cannot update worker config: role missing or misconfigured.") - return False - + # fetch the config from the coordinator worker_config = self._worker_config - if not worker_config: - logger.warning("cannot update worker config: coordinator hasn't published one yet.") - return False - + # and compare it against the one on disk (if any) if self._running_worker_config() != worker_config: config_as_yaml = yaml.safe_dump(worker_config) self._container.push(CONFIG_FILE, config_as_yaml, make_dirs=True) @@ -621,11 +673,11 @@ def _sync_tls_files(self, tls_data: TLSData): def _update_tls_certificates(self) -> bool: """Update the TLS certificates on disk according to their availability. + Assumes that the caller has verified that the worker is ready, i.e. + that we have a container and a cluster configuration. + Return True if we need to restart the workload after this update. """ - if not self._container.can_connect(): - return False - tls_data = self.cluster.get_tls_data(allow_none=True) if not tls_data: return False diff --git a/tests/test_coordinated_workers/test_worker.py b/tests/test_coordinated_workers/test_worker.py index 105d67a..9bc31ea 100644 --- a/tests/test_coordinated_workers/test_worker.py +++ b/tests/test_coordinated_workers/test_worker.py @@ -102,6 +102,7 @@ def test_roles_from_config(roles_active, roles_inactive, expected): assert set(mgr.charm.worker.roles) == set(expected) +@patch.object(Worker, "is_ready", new=lambda _: True) def test_worker_restarts_if_some_service_not_up(tmp_path): # GIVEN a worker with some services MyCharm.layer = ops.pebble.Layer( @@ -166,6 +167,7 @@ def test_worker_restarts_if_some_service_not_up(tmp_path): ] +@patch.object(Worker, "is_ready", new=lambda _: True) def test_worker_does_not_restart_external_services(tmp_path): # GIVEN a worker with some services and a layer with some other services MyCharm.layer = ops.pebble.Layer( @@ -652,3 +654,123 @@ def test_worker_certs_update_only_s3(restart_mock, tmp_path, s3_ca_on_disk): # AND the worker restarts the workload IF it was not on disk already assert restart_mock.call_count == (0 if s3_ca_on_disk else 1) + + +@patch.object(Worker, "restart") +@patch.object(Worker, "stop") +@pytest.mark.parametrize("tls", (True, False)) +def test_stop_called_on_no_cluster(stop_mock, restart_mock, tmp_path, tls): + # GIVEN a worker who's all happy to begin with + ctx = testing.Context( + MyCharm, + meta={ + "name": "foo", + "requires": {"cluster": {"interface": "cluster"}}, + "containers": {"foo": {"type": "oci-image"}}, + }, + config={"options": {"role-all": {"type": "boolean", "default": True}}}, + ) + cert = tmp_path / "cert.cert" + key = tmp_path / "key.key" + client_ca = tmp_path / "client_ca.cert" + s3_ca_chain = tmp_path / "s3_ca_chain.cert" + + if tls: + s3_ca_chain.write_text("something_tls") + cert.write_text("something_tls") + key.write_text("something_tls") + client_ca.write_text("something_tls") + + container = testing.Container( + "foo", + can_connect=True, + execs={testing.Exec(("update-ca-certificates", "--fresh"))}, + mounts={ + "cert": testing.Mount(location=CERT_FILE, source=cert), + "key": testing.Mount(location=KEY_FILE, source=key), + "client_ca": testing.Mount(location=CLIENT_CA_FILE, source=client_ca), + "s3_ca_chain": testing.Mount(location=S3_TLS_CA_CHAIN_FILE, source=s3_ca_chain), + }, + ) + + # WHEN the charm receives any event + ctx.run( + ctx.on.update_status(), + testing.State(leader=True, containers={container}), + ) + + fs = container.get_filesystem(ctx) + # THEN the worker wipes all certificates if they are there + assert not fs.joinpath(CERT_FILE).exists() + assert not fs.joinpath(KEY_FILE).exists() + assert not fs.joinpath(CLIENT_CA_FILE).exists() + assert not fs.joinpath(S3_TLS_CA_CHAIN_FILE).exists() + + # AND the worker stops the workload instead of restarting it + assert not restart_mock.called + assert stop_mock.called + + +@patch.object(Worker, "is_ready", new=lambda _: False) +def test_worker_stop_all_services_if_not_ready(tmp_path): + # GIVEN a worker with some services + MyCharm.layer = ops.pebble.Layer( + { + "services": { + "foo": { + "summary": "foos all the things", + "description": "bar", + "startup": "enabled", + "override": "merge", + "command": "ls -la", + }, + "bar": { + "summary": "bars the foos", + "description": "bar", + "startup": "enabled", + "command": "exit 1", + }, + "baz": { + "summary": "bazzes all of the bars", + "description": "bar", + "startup": "enabled", + "command": "echo hi", + }, + } + } + ) + ctx = testing.Context( + MyCharm, + meta={ + "name": "foo", + "requires": {"cluster": {"interface": "cluster"}}, + "containers": {"foo": {"type": "oci-image"}}, + }, + config={"options": {"role-all": {"type": "boolean", "default": True}}}, + ) + # WHEN the charm receives any event, but it is not ready + cfg = tmp_path / "cfg.yaml" + cfg.write_text("some: yaml") + container = testing.Container( + "foo", + layers={"base": MyCharm.layer}, + can_connect=True, + mounts={"local": testing.Mount(location=CONFIG_FILE, source=cfg)}, + execs={ + testing.Exec(("update-ca-certificates", "--fresh")), + testing.Exec(("/bin/foo", "-version"), stdout="foo"), + }, + service_statuses={ + "foo": ops.pebble.ServiceStatus.ACTIVE, + "bar": ops.pebble.ServiceStatus.ACTIVE, + "baz": ops.pebble.ServiceStatus.INACTIVE, + }, + ) + state_out = ctx.run(ctx.on.pebble_ready(container), testing.State(containers={container})) + + # THEN the charm restarts all the services that are down + container_out = state_out.get_container("foo") + service_statuses = container_out.service_statuses.values() + assert all(svc is ops.pebble.ServiceStatus.INACTIVE for svc in service_statuses), [ + stat.value for stat in service_statuses + ] diff --git a/tests/test_coordinated_workers/test_worker_status.py b/tests/test_coordinated_workers/test_worker_status.py index e7cd631..3ffe71f 100644 --- a/tests/test_coordinated_workers/test_worker_status.py +++ b/tests/test_coordinated_workers/test_worker_status.py @@ -153,8 +153,6 @@ def test_status_check_no_pebble(ctx, base_state, caplog): # THEN the charm sets blocked assert state_out.unit_status == ops.WaitingStatus("Waiting for `workload` container") - # AND THEN the charm logs that the container isn't ready. - assert "container cannot connect, skipping update_config." in caplog.messages @k8s_patch()