From 15c73684959b163bc3f49af0e99b6fd5e896ce4d Mon Sep 17 00:00:00 2001 From: Carl Csaposs Date: Wed, 17 Apr 2024 12:49:53 +0000 Subject: [PATCH] Use OpenSearch for locking with fallback to peer databag (#211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Issue ### Fix issues with current lock implementation - Fixes #183: Units can start/restart without rollingops lock (if event deferred, lock released but unit (re)starts on next event anyways). - Unit can scale down (using "ops lock") while another unit is (re)starting (using separate rollingops lock) ### Prepare for in-place upgrades - In order to maintain HA, only one unit should start, restart, join cluster (scale up), leave cluster (scale down), or upgrade at a time - Upgrades adds an additional challenge: if the charm code is upgraded (in addition to the workload), it's possible for the leader unit to go into error state—which prevents coordination of locks via peer databag (current rolling ops implementation) and can block rollback (for other units) ## Options considered 1. Use peer relation databag for lock - Unit requests lock by adding data to unit databag -> relation-changed event triggered on leader - Leader grants lock by adding data to app databag -> relation-changed event triggered on unit - Unit proceeds & releases lock from unit databag 2. Don't use lock for upgrades. Make upgrade order deterministic (so that each unit can independently determine upgrade order & upgrade order does not change) and each unit upgrades when it sees the units before it have upgraded 3. Use opensearch index/document for lock 4. Option 3 but fallback to option 1 if ~~no units online~~ less than 2 units online Cons of each option: 1. if leader unit is in error state (raises uncaught exception), rollback will be blocked for all units 2. a unit can restart for non-upgrade related reasons (role re-balancing from network cut or scale up) while another unit is restarting to upgrade 3. doesn't work if all units offline (initial start, full cluster crash/network cut) 4. implementation complexity, some edge cases not supported e.g. https://github.com/canonical/opensearch-operator/blob/b1b28c10f2bd70bee4270707b616d8d565b8b616/lib/charms/opensearch/v0/opensearch_locking.py#L227-L228 Pros of each option: 1. only one unit will (re)start at a time, for any reason 2. rollback with `juju refresh` will immediately rollback highest unit even if leader/other units in error state 3. only one unit will (re)start at a time, for any reason and rollback with `juju refresh` will quickly rollback highest unit even if leader unit charm code in error state 4. same as 3 and locking (mostly, except aforementioned edge cases) works when all units offline More context: Discussion: https://chat.canonical.com/canonical/pl/9fah5tfxd38ybnx3tq7zugxfyh Option 1 in discussion is option 2 here, option 2 in discussion is option 1 here **Option chosen:** Option 4 ### Opensearch index vs document for lock Current "ops lock" implementation with opensearch index: Each unit requests the lock by trying to create an index. If the index does not exist, the "lock" is granted. However, if a unit requests the lock, charm goes into error state, and error state is resolved (e.g. after rollback) it will not be able to use the lock—no unit will be aware that it has the lock and no unit will be able to release the lock Solution: use document id 0 that stores "unit-name" as lock Discussion: https://chat.canonical.com/canonical/pl/biddxzzk3fbpjgbhmatzr8n6bw ## Solution ### Design (Option 4): Use opensearch document as lock (for any (re)start, join cluster, leave cluster, or upgrade). Fallback to peer databag if all units offline. ### Implementation Create custom events `_StartOpensearch` and `_RestartOpensearch` https://github.com/canonical/opensearch-operator/blob/b1b28c10f2bd70bee4270707b616d8d565b8b616/lib/charms/opensearch/v0/opensearch_base_charm.py#L121-L132 When opensearch should be (re)started, emit the custom event. Custom event requests the lock. If granted, it (re)starts opensearch. Once opensearch fully ready, the lock is released. If opensearch fails to start, the lock is released. While opensearch is starting or while the lock is not granted, the custom event will be continually deferred. Note: the original event is not deferred—only the custom event. This is so that any logic that ran before the request to (re)start opensearch does not get re-ran. By requesting the lock within the custom event, and attempting to reacquire the lock each time the custom event runs (i.e. after every time it's deferred), we solve the design issue with rollingops and deferred events detailed in #183 --- lib/charms/opensearch/v0/helper_cluster.py | 23 +- .../opensearch/v0/opensearch_base_charm.py | 110 ++--- .../opensearch/v0/opensearch_exceptions.py | 4 - .../opensearch/v0/opensearch_locking.py | 324 +++++++++++--- lib/charms/rolling_ops/v0/rollingops.py | 415 ------------------ metadata.yaml | 4 +- tests/unit/lib/test_backups.py | 1 - tests/unit/lib/test_helper_cluster.py | 141 +++--- tests/unit/lib/test_ml_plugins.py | 11 +- tests/unit/lib/test_opensearch_base_charm.py | 7 +- .../lib/test_opensearch_relation_provider.py | 6 +- tests/unit/lib/test_opensearch_secrets.py | 6 +- 12 files changed, 415 insertions(+), 637 deletions(-) delete mode 100644 lib/charms/rolling_ops/v0/rollingops.py diff --git a/lib/charms/opensearch/v0/helper_cluster.py b/lib/charms/opensearch/v0/helper_cluster.py index 58bf0dbab..b7195f883 100644 --- a/lib/charms/opensearch/v0/helper_cluster.py +++ b/lib/charms/opensearch/v0/helper_cluster.py @@ -46,14 +46,8 @@ def suggest_roles(nodes: List[Node], planned_units: int) -> List[str]: — odd: "all" the nodes are cm_eligible nodes. — even: "all - 1" are cm_eligible and 1 data node. """ - max_cms = ClusterTopology.max_cluster_manager_nodes(planned_units) - - base_roles = ["data", "ingest", "ml", "coordinating_only"] - full_roles = base_roles + ["cluster_manager"] - nodes_by_roles = ClusterTopology.nodes_count_by_role(nodes) - if nodes_by_roles.get("cluster_manager", 0) == max_cms: - return base_roles - return full_roles + # TODO: remove in https://github.com/canonical/opensearch-operator/issues/230 + return ["data", "ingest", "ml", "coordinating_only", "cluster_manager"] @staticmethod def get_cluster_settings( @@ -74,6 +68,7 @@ def get_cluster_settings( @staticmethod def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]: + # TODO: remove in https://github.com/canonical/opensearch-operator/issues/230 """Recompute the configuration of all the nodes (cluster set to auto-generate roles).""" if not nodes: return {} @@ -86,19 +81,11 @@ def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]: else: # Leave node unchanged nodes_by_name[node.name] = node - base_roles = ["data", "ingest", "ml", "coordinating_only"] - full_roles = base_roles + ["cluster_manager"] - highest_unit_number = max(node.unit_number for node in current_cluster_nodes) for node in current_cluster_nodes: - # we do this in order to remove any non-default role / add any missing default role - if len(current_cluster_nodes) % 2 == 0 and node.unit_number == highest_unit_number: - roles = base_roles - else: - roles = full_roles - nodes_by_name[node.name] = Node( name=node.name, - roles=roles, + # we do this in order to remove any non-default role / add any missing default role + roles=["data", "ingest", "ml", "coordinating_only", "cluster_manager"], ip=node.ip, app_name=node.app_name, unit_number=node.unit_number, diff --git a/lib/charms/opensearch/v0/opensearch_base_charm.py b/lib/charms/opensearch/v0/opensearch_base_charm.py index 4b41111cb..cb8eac84c 100644 --- a/lib/charms/opensearch/v0/opensearch_base_charm.py +++ b/lib/charms/opensearch/v0/opensearch_base_charm.py @@ -66,7 +66,7 @@ from charms.opensearch.v0.opensearch_fixes import OpenSearchFixes from charms.opensearch.v0.opensearch_health import HealthColors, OpenSearchHealth from charms.opensearch.v0.opensearch_internal_data import RelationDataStore, Scope -from charms.opensearch.v0.opensearch_locking import OpenSearchOpsLock +from charms.opensearch.v0.opensearch_locking import OpenSearchNodeLock from charms.opensearch.v0.opensearch_nodes_exclusions import ( ALLOCS_TO_DELETE, VOTING_TO_DELETE, @@ -87,7 +87,6 @@ from charms.opensearch.v0.opensearch_secrets import OpenSearchSecrets from charms.opensearch.v0.opensearch_tls import OpenSearchTLS from charms.opensearch.v0.opensearch_users import OpenSearchUserManager -from charms.rolling_ops.v0.rollingops import RollingOpsManager from charms.tls_certificates_interface.v3.tls_certificates import ( CertificateAvailableEvent, ) @@ -105,7 +104,7 @@ StorageDetachingEvent, UpdateStatusEvent, ) -from ops.framework import EventBase +from ops.framework import EventBase, EventSource from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed @@ -127,9 +126,26 @@ logger = logging.getLogger(__name__) +class _StartOpenSearch(EventBase): + """Attempt to acquire lock & start OpenSearch. + + This event will be deferred until OpenSearch starts. + """ + + +class _RestartOpenSearch(EventBase): + """Attempt to acquire lock & restart OpenSearch. + + This event will be deferred until OpenSearch stops. Then, `_StartOpenSearch` will be emitted. + """ + + class OpenSearchBaseCharm(CharmBase): """Base class for OpenSearch charms.""" + _start_opensearch_event = EventSource(_StartOpenSearch) + _restart_opensearch_event = EventSource(_RestartOpenSearch) + def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): super().__init__(*args) @@ -147,7 +163,7 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.tls = OpenSearchTLS(self, TLS_RELATION) self.status = Status(self) self.health = OpenSearchHealth(self) - self.ops_lock = OpenSearchOpsLock(self) + self.node_lock = OpenSearchNodeLock(self) self.cos_integration = COSAgentProvider( self, relation_name=COSRelationName, @@ -161,14 +177,14 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None): self.plugin_manager = OpenSearchPluginManager(self) self.backup = OpenSearchBackup(self) - self.service_manager = RollingOpsManager( - self, relation=SERVICE_MANAGER, callback=self._start_opensearch - ) self.user_manager = OpenSearchUserManager(self) self.opensearch_provider = OpenSearchProvider(self) self.peer_cluster_provider = OpenSearchPeerClusterProvider(self) self.peer_cluster_requirer = OpenSearchPeerClusterRequirer(self) + self.framework.observe(self._start_opensearch_event, self._start_opensearch) + self.framework.observe(self._restart_opensearch_event, self._restart_opensearch) + self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on.start, self._on_start) self.framework.observe(self.on.update_status, self._on_update_status) @@ -271,7 +287,7 @@ def _on_start(self, event: StartEvent): # request the start of OpenSearch self.status.set(WaitingStatus(RequestUnitServiceOps.format("start"))) - self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch") + self._start_opensearch_event.emit() def _apply_peer_cm_directives_and_check_if_can_start(self) -> bool: """Apply the directives computed by the opensearch peer cluster manager.""" @@ -404,7 +420,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent): def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # noqa: C901 """Triggered when removing unit, Prior to the storage being detached.""" # acquire lock to ensure only 1 unit removed at a time - self.ops_lock.acquire() + if not self.node_lock.acquired: + # Raise uncaught exception to prevent Juju from removing unit + raise Exception("Unable to acquire lock: Another unit is starting or stopping.") # if the leader is departing, and this hook fails "leader elected" won"t trigger, # so we want to re-balance the node roles from here @@ -446,7 +464,7 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # no raise OpenSearchHAError(ClusterHealthUnknown) finally: # release lock - self.ops_lock.release() + self.node_lock.release() def _on_update_status(self, event: UpdateStatusEvent): """On update status event. @@ -524,9 +542,7 @@ def _on_config_changed(self, event: ConfigChangedEvent): # noqa C901 if self.unit.is_leader(): self.status.set(MaintenanceStatus(PluginConfigCheck), app=True) if self.plugin_manager.run(): - self.on[self.service_manager.name].acquire_lock.emit( - callback_override="_restart_opensearch" - ) + self._restart_opensearch_event.emit() except (OpenSearchNotFullyReadyError, OpenSearchPluginError) as e: if isinstance(e, OpenSearchNotFullyReadyError): logger.warning("Plugin management: cluster not ready yet at config changed") @@ -618,9 +634,7 @@ def on_tls_conf_set( # In case of renewal of the unit transport layer cert - restart opensearch if renewal and self.is_admin_user_configured() and self.is_tls_fully_configured(): - self.on[self.service_manager.name].acquire_lock.emit( - callback_override="_restart_opensearch" - ) + self._restart_opensearch_event.emit() def on_tls_relation_broken(self, _: RelationBrokenEvent): """As long as all certificates are produced, we don't do anything.""" @@ -699,8 +713,12 @@ def _handle_change_to_main_orchestrator_if_needed( self.tls.request_new_admin_certificate() - def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 + def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901 """Start OpenSearch, with a generated or passed conf, if all resources configured.""" + if not self.node_lock.acquired: + logger.debug("Lock to start opensearch not acquired. Will retry next event") + event.defer() + return self.peers_data.delete(Scope.UNIT, "started") if self.opensearch.is_started(): try: @@ -710,25 +728,18 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 return if not self._can_service_start(): - self.peers_data.delete(Scope.UNIT, "starting") + self.node_lock.release() event.defer() return - if self.peers_data.get(Scope.UNIT, "starting", False) and self.opensearch.is_failed(): - self.peers_data.delete(Scope.UNIT, "starting") + if self.opensearch.is_failed(): + self.node_lock.release() + self.status.set(BlockedStatus(ServiceStartError)) event.defer() return self.unit.status = WaitingStatus(WaitingToStart) - rel = self.model.get_relation(PeerRelationName) - for unit in rel.units.union({self.unit}): - if rel.data[unit].get("starting") == "True": - event.defer() - return - - self.peers_data.put(Scope.UNIT, "starting", True) - try: # Retrieve the nodes of the cluster, needed to configure this node nodes = self._get_nodes(False) @@ -739,12 +750,12 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 # Set the configuration of the node self._set_node_conf(nodes) except OpenSearchHttpError: - self.peers_data.delete(Scope.UNIT, "starting") + self.node_lock.release() event.defer() return except OpenSearchProvidedRolesException as e: logger.exception(e) - self.peers_data.delete(Scope.UNIT, "starting") + self.node_lock.release() event.defer() self.unit.status = BlockedStatus(str(e)) return @@ -761,7 +772,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901 event.defer() except OpenSearchStartError as e: logger.exception(e) - self.peers_data.delete(Scope.UNIT, "starting") + self.node_lock.release() self.status.set(BlockedStatus(ServiceStartError)) event.defer() @@ -788,8 +799,8 @@ def _post_start_init(self, event: EventBase): # Remove the exclusions that could not be removed when no units were online self.opensearch_exclusions.delete_current() - # Remove the 'starting' flag on the unit - self.peers_data.delete(Scope.UNIT, "starting") + self.node_lock.release() + self.peers_data.put(Scope.UNIT, "started", True) # apply post_start fixes to resolve start related upstream bugs @@ -827,18 +838,23 @@ def _stop_opensearch(self) -> None: # 3. Remove the exclusions self.opensearch_exclusions.delete_current() - def _restart_opensearch(self, event: EventBase) -> None: + def _restart_opensearch(self, event: _RestartOpenSearch) -> None: """Restart OpenSearch if possible.""" - if not self.peers_data.get(Scope.UNIT, "starting", False): - try: - self._stop_opensearch() - except OpenSearchStopError as e: - logger.exception(e) - event.defer() - self.status.set(WaitingStatus(ServiceIsStopping)) - return + if not self.node_lock.acquired: + logger.debug("Lock to restart opensearch not acquired. Will retry next event") + event.defer() + return - self._start_opensearch(event) + try: + self._stop_opensearch() + except OpenSearchStopError as e: + logger.exception(e) + self.node_lock.release() + event.defer() + self.status.set(WaitingStatus(ServiceIsStopping)) + return + + self._start_opensearch_event.emit() def _can_service_start(self) -> bool: """Return if the opensearch service can start.""" @@ -917,9 +933,7 @@ def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901 return False self.status.set(WaitingStatus(WaitingToStart)) - self.on[self.service_manager.name].acquire_lock.emit( - callback_override="_restart_opensearch" - ) + self._restart_opensearch_event.emit() return True def _purge_users(self): @@ -1148,9 +1162,7 @@ def _reconfigure_and_restart_unit_if_needed(self): return self.status.set(WaitingStatus(WaitingToStart)) - self.on[self.service_manager.name].acquire_lock.emit( - callback_override="_restart_opensearch" - ) + self._restart_opensearch_event.emit() def _recompute_roles_if_needed(self, event: RelationChangedEvent): """Recompute node roles:self-healing that didn't trigger leader related event occurred.""" diff --git a/lib/charms/opensearch/v0/opensearch_exceptions.py b/lib/charms/opensearch/v0/opensearch_exceptions.py index 5f3cec646..64c85e4e0 100644 --- a/lib/charms/opensearch/v0/opensearch_exceptions.py +++ b/lib/charms/opensearch/v0/opensearch_exceptions.py @@ -63,10 +63,6 @@ class OpenSearchNotFullyReadyError(OpenSearchError): """Exception thrown when a node is started but not full ready to take on requests.""" -class OpenSearchOpsLockAlreadyAcquiredError(OpenSearchError): - """Exception thrown when a node is started but not full ready to take on requests.""" - - class OpenSearchCmdError(OpenSearchError): """Exception thrown when an OpenSearch bin command fails.""" diff --git a/lib/charms/opensearch/v0/opensearch_locking.py b/lib/charms/opensearch/v0/opensearch_locking.py index 159294cfd..99a2ef1b7 100644 --- a/lib/charms/opensearch/v0/opensearch_locking.py +++ b/lib/charms/opensearch/v0/opensearch_locking.py @@ -1,16 +1,17 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -"""Class for Setting configuration in opensearch config files.""" +"""Ensure that only one node (re)starts, joins the cluster, or leaves the cluster at a time.""" +import json import logging +import typing -from charms.opensearch.v0.constants_charm import PeerRelationName -from charms.opensearch.v0.opensearch_exceptions import ( - OpenSearchHttpError, - OpenSearchOpsLockAlreadyAcquiredError, -) -from charms.opensearch.v0.opensearch_internal_data import Scope -from tenacity import retry, stop_after_attempt, wait_fixed +import ops +from charms.opensearch.v0.helper_cluster import ClusterTopology +from charms.opensearch.v0.opensearch_exceptions import OpenSearchHttpError + +if typing.TYPE_CHECKING: + import charms.opensearch.v0.opensearch_base_charm as opensearch_base_charm # The unique Charmhub library identifier, never change it LIBID = "0924c6d81c604a15873ad43498cd6895" @@ -25,78 +26,265 @@ logger = logging.getLogger(__name__) -class OpenSearchOpsLock: - """This class covers the configuration changes depending on certain actions.""" +class _PeerRelationLock(ops.Object): + """Fallback lock when all units of OpenSearch are offline.""" - LOCK_INDEX = ".ops_lock" - PEER_DATA_LOCK_FLAG = "ops_removing_unit" + _ENDPOINT_NAME = "node-lock-fallback" - def __init__(self, charm): + def __init__(self, charm: ops.CharmBase): + super().__init__(charm, self._ENDPOINT_NAME) self._charm = charm - self._opensearch = charm.opensearch + self.framework.observe( + self._charm.on[self._ENDPOINT_NAME].relation_changed, self._on_peer_relation_changed + ) + + @property + def acquired(self) -> bool: + """Attempt to acquire lock. - @retry(stop=stop_after_attempt(3), wait=wait_fixed(0.5), reraise=True) - def acquire(self): - """Method for Acquiring the "ops" lock.""" - # no lock acquisition needed if only 1 unit remaining - if len(self._charm.model.get_relation(PeerRelationName).units) == 1: + Returns: + Whether lock was acquired + """ + if not self._relation: + return False + self._relation.data[self._charm.unit]["lock-requested"] = json.dumps(True) + if self._charm.unit.is_leader(): + logger.debug("[Node lock] Requested peer lock as leader unit") + # A separate relation-changed event won't get fired + self._on_peer_relation_changed() + acquired = self._unit_with_lock == self._charm.unit.name + if acquired: + logger.debug("[Node lock] Acquired via peer databag") + else: + logger.debug( + f"[Node lock] Not acquired. Unit with peer databag lock: {self._unit_with_lock}" + ) + return acquired + + def release(self): + """Release lock for this unit.""" + if not self._relation: return + self._relation.data[self._charm.unit].pop("lock-requested", None) + if self._charm.unit.is_leader(): + logger.debug("[Node lock] Released peer lock as leader unit") + # A separate relation-changed event won't get fired + self._on_peer_relation_changed() - # we check first on the peer data bag if the lock is already acquired - if self._is_lock_in_peer_data(): - raise OpenSearchOpsLockAlreadyAcquiredError("Another unit is being removed.") + def _unit_requested_lock(self, unit: ops.Unit): + """Whether unit requested lock.""" + assert self._relation + value = self._relation.data[unit].get("lock-requested") + if not value: + return False + value = json.loads(value) + if not isinstance(value, bool): + raise ValueError + return value - host = self._charm.unit_ip if self._opensearch.is_node_up() else None + @property + def _unit_with_lock(self) -> str | None: + if self._relation: + return self._relation.data[self._charm.app].get("unit-with-lock") - # we can use opensearch to lock - if host is not None or self._charm.alt_hosts: - try: - # attempt lock acquisition through index creation, should crash if index - # already created, meaning another unit is holding the lock - self._opensearch.request( - "PUT", - endpoint=f"/{OpenSearchOpsLock.LOCK_INDEX}", - host=host, - alt_hosts=self._charm.alt_hosts, - retries=3, - ) - self._charm.peers_data.put(Scope.UNIT, OpenSearchOpsLock.PEER_DATA_LOCK_FLAG, True) - return - except OpenSearchHttpError as e: - if e.response_code != 400: - raise - raise OpenSearchOpsLockAlreadyAcquiredError("Another unit is being removed.") + @_unit_with_lock.setter + def _unit_with_lock(self, value: str): + assert self._relation + self._relation.data[self._charm.app]["unit-with-lock"] = value - # we could not use opensearch for locking, we use the peer rel data bag - self._charm.peers_data.put(Scope.UNIT, OpenSearchOpsLock.PEER_DATA_LOCK_FLAG, True) + @_unit_with_lock.deleter + def _unit_with_lock(self): + assert self._relation + self._relation.data[self._charm.app].pop("unit-with-lock", None) - def release(self): - """Method for Releasing the "ops" lock.""" - host = self._charm.unit_ip if self._opensearch.is_node_up() else None + @property + def _relation(self): + # Use property instead of `self._relation =` in `__init__()` because of ops Harness unit + # tests + return self._charm.model.get_relation(self._ENDPOINT_NAME) + + def _on_peer_relation_changed(self, _=None): + """Grant & release lock.""" + if not self._charm.unit.is_leader(): + return + assert self._relation + if self._unit_with_lock and self._unit_requested_lock( + self._charm.model.get_unit(self._unit_with_lock) + ): + # Lock still in use, do not release + logger.debug("[Node lock] (leader) lock still in use") + return + # TODO: adjust which unit gets priority on lock? + for unit in (*self._relation.units, self._charm.unit): + if self._unit_requested_lock(unit): + self._unit_with_lock = unit.name + logger.debug(f"[Node lock] (leader) granted peer lock to {unit.name=}") + break + else: + logger.debug("[Node lock] (leader) cleared peer lock") + del self._unit_with_lock - # can use opensearch to remove lock - if host is not None or self._charm.alt_hosts: + +class OpenSearchNodeLock(ops.Object): + """Ensure that only one node (re)starts, joins the cluster, or leaves the cluster at a time. + + Uses OpenSearch document for lock. Falls back to peer databag if no units online + """ + + _OPENSEARCH_INDEX = ".charm_node_lock" + + def __init__(self, charm: "opensearch_base_charm.OpenSearchBaseCharm"): + super().__init__(charm, "opensearch-node-lock") + self._charm = charm + self._opensearch = charm.opensearch + self._peer = _PeerRelationLock(self._charm) + + def _unit_with_lock(self, host) -> str | None: + """Unit that has acquired OpenSearch lock.""" + try: + document_data = self._opensearch.request( + "GET", + endpoint=f"/{self._OPENSEARCH_INDEX}/_source/0", + host=host, + alt_hosts=self._charm.alt_hosts, + retries=3, + ) + except OpenSearchHttpError as e: + if e.response_code == 404: + # No unit has lock + return + raise + return document_data["unit-name"] + + @property + def acquired(self) -> bool: # noqa: C901 + """Attempt to acquire lock. + + Returns: + Whether lock was acquired + """ + if self._opensearch.is_node_up(): + host = self._charm.unit_ip + else: + host = None + alt_hosts = [host for host in self._charm.alt_hosts if self._opensearch.is_node_up(host)] + if host or alt_hosts: + logger.debug("[Node lock] 1+ opensearch nodes online") try: - self._opensearch.request( - "DELETE", - endpoint=f"/{OpenSearchOpsLock.LOCK_INDEX}", - host=host, - alt_hosts=self._charm.alt_hosts, - retries=3, + online_nodes = len( + ClusterTopology.nodes( + self._opensearch, use_localhost=host is not None, hosts=alt_hosts + ) ) - except OpenSearchHttpError as e: - # ignore 404, it means the index is not found and this just means that - # the cleanup happened before but event got deferred because of another error - if e.response_code != 404: - raise - - self._charm.peers_data.delete(Scope.UNIT, OpenSearchOpsLock.PEER_DATA_LOCK_FLAG) - - def _is_lock_in_peer_data(self) -> bool: - """Method checking if lock acquired from the peer rel data.""" - rel = self._charm.model.get_relation(PeerRelationName) - for unit in rel.units: - if rel.data[unit].get(OpenSearchOpsLock.PEER_DATA_LOCK_FLAG) == "True": + except OpenSearchHttpError: + logger.exception("Error getting OpenSearch nodes") + return False + logger.debug(f"[Node lock] Opensearch {online_nodes=}") + assert online_nodes > 0 + if online_nodes >= 2: + logger.debug("[Node lock] Attempting to acquire opensearch lock") + # Acquire opensearch lock + # Create index if it doesn't exist + try: + self._opensearch.request( + "PUT", + endpoint=f"/{self._OPENSEARCH_INDEX}", + host=host, + alt_hosts=alt_hosts, + retries=3, + payload={"settings": {"index": {"auto_expand_replicas": "0-all"}}}, + ) + except OpenSearchHttpError as e: + if ( + e.response_code == 400 + and e.response_body.get("error", {}).get("type") + == "resource_already_exists_exception" + ): + # Index already created + pass + else: + logger.exception("Error creating OpenSearch lock index") + return False + # Attempt to create document id 0 + try: + self._opensearch.request( + "PUT", + endpoint=f"/{self._OPENSEARCH_INDEX}/_create/0?refresh=true", + host=host, + alt_hosts=alt_hosts, + retries=3, + payload={"unit-name": self._charm.unit.name}, + ) + except OpenSearchHttpError as e: + if e.response_code == 409 and "document already exists" in e.response_body.get( + "error", {} + ).get("reason", ""): + # Document already created + pass + else: + logger.exception("Error creating OpenSearch lock document") + return False + unit = self._unit_with_lock(host) + if unit == self._charm.unit.name: + # Lock acquired + # Release peer databag lock, if any + logger.debug("[Node lock] Acquired via opensearch") + self._peer.release() + logger.debug("[Node lock] Released redundant peer lock (if held)") return True + if unit or online_nodes >= 2: + # Another unit has lock + # (Or document deleted after request to create document & before request in + # `self._unit_with_lock()`) + logger.debug(f"[Node lock] Not acquired. Unit with opensearch lock: {unit}") + return False + # If online_nodes == 1, we should acquire the lock via the peer databag. + # If we acquired the lock via OpenSearch and this unit was stopping, we would be unable + # to release the OpenSearch lock. For example, when scaling to 0. + # Then, when 1+ OpenSearch nodes are online, a unit that no longer exists could hold + # the lock. + # Note: if online_nodes > 1, this situation is still possible (e.g. if this unit was + # stopping and another unit went offline simultaneously)—but it's an edge case we don't + # support (to reduce complexity & improve robustness in other cases). + # If online_nodes > 1, we should re-attempt to acquire the OpenSearch lock. + logger.debug("[Node lock] No unit has opensearch lock") + logger.debug("[Node lock] Using peer databag for lock") + # Request peer databag lock + # If return value is True: + # - Lock granted in previous Juju event + # - OR, unit is leader & lock granted in this Juju event + return self._peer.acquired + + def release(self): + """Release lock. - return False + Limitation: if lock acquired via OpenSearch document and all units offline, OpenSearch + document lock will not be released + """ + logger.debug("[Node lock] Releasing lock") + if self._opensearch.is_node_up(): + host = self._charm.unit_ip + else: + host = None + alt_hosts = [host for host in self._charm.alt_hosts if self._opensearch.is_node_up(host)] + if host or alt_hosts: + logger.debug("[Node lock] Checking which unit has opensearch lock") + # Check if this unit currently has lock + if self._unit_with_lock(host) == self._charm.unit.name: + logger.debug("[Node lock] Releasing opensearch lock") + # Delete document id 0 + try: + self._opensearch.request( + "DELETE", + endpoint=f"/{self._OPENSEARCH_INDEX}/_doc/0?refresh=true", + host=host, + alt_hosts=alt_hosts, + retries=3, + ) + except OpenSearchHttpError as e: + if e.response_code != 404: + raise + logger.debug("[Node lock] Released opensearch lock") + self._peer.release() + logger.debug("[Node lock] Released peer lock (if held)") + logger.debug("[Node lock] Released lock") diff --git a/lib/charms/rolling_ops/v0/rollingops.py b/lib/charms/rolling_ops/v0/rollingops.py deleted file mode 100644 index 5a7d4ce30..000000000 --- a/lib/charms/rolling_ops/v0/rollingops.py +++ /dev/null @@ -1,415 +0,0 @@ -# Copyright 2022 Canonical Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This library enables "rolling" operations across units of a charmed Application. - -For example, a charm author might use this library to implement a "rolling restart", in -which all units in an application restart their workload, but no two units execute the -restart at the same time. - -To implement the rolling restart, a charm author would do the following: - -1. Add a peer relation called 'restart' to a charm's `metadata.yaml`: -```yaml -peers: - restart: - interface: rolling_op -``` - -Import this library into src/charm.py, and initialize a RollingOpsManager in the Charm's -`__init__`. The Charm should also define a callback routine, which will be executed when -a unit holds the distributed lock: - -src/charm.py -```python -# ... -from charms.rolling_ops.v0.rollingops import RollingOpsManager -# ... -class SomeCharm(...): - def __init__(...) - # ... - self.restart_manager = RollingOpsManager( - charm=self, relation="restart", callback=self._restart - ) - # ... - def _restart(self, event): - systemd.service_restart('foo') -``` - -To kick off the rolling restart, emit this library's AcquireLock event. The simplest way -to do so would be with an action, though it might make sense to acquire the lock in -response to another event. - -```python - def _on_trigger_restart(self, event): - self.charm.on[self.restart_manager.name].acquire_lock.emit() -``` - -In order to trigger the restart, a human operator would execute the following command on -the CLI: - -``` -juju run-action some-charm/0 some-charm/1 <... some-charm/n> restart -``` - -Note that all units that plan to restart must receive the action and emit the aquire -event. Any units that do not run their acquire handler will be left out of the rolling -restart. (An operator might take advantage of this fact to recover from a failed rolling -operation without restarting workloads that were able to successfully restart -- simply -omit the successful units from a subsequent run-action call.) - -""" -import logging -from enum import Enum -from typing import AnyStr, Callable, Optional - -from ops.charm import ActionEvent, CharmBase, RelationChangedEvent -from ops.framework import EventBase, Object -from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus - -logger = logging.getLogger(__name__) - -# The unique Charmhub library identifier, never change it -LIBID = "20b7777f58fe421e9a223aefc2b4d3a4" - -# Increment this major API version when introducing breaking changes -LIBAPI = 0 - -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 5 - - -class LockNoRelationError(Exception): - """Raised if we are trying to process a lock, but do not appear to have a relation yet.""" - - pass - - -class LockState(Enum): - """Possible states for our Distributed lock. - - Note that there are two states set on the unit, and two on the application. - - """ - - ACQUIRE = "acquire" - RELEASE = "release" - GRANTED = "granted" - IDLE = "idle" - - -class Lock: - """A class that keeps track of a single asynchronous lock. - - Warning: a Lock has permission to update relation data, which means that there are - side effects to invoking the .acquire, .release and .grant methods. Running any one of - them will trigger a RelationChanged event, once per transition from one internal - status to another. - - This class tracks state across the cloud by implementing a peer relation - interface. There are two parts to the interface: - - 1) The data on a unit's peer relation (defined in metadata.yaml.) Each unit can update - this data. The only meaningful values are "acquire", and "release", which represent - a request to acquire the lock, and a request to release the lock, respectively. - - 2) The application data in the relation. This tracks whether the lock has been - "granted", Or has been released (and reverted to idle). There are two valid states: - "granted" or None. If a lock is in the "granted" state, a unit should emit a - RunWithLocks event and then release the lock. - - If a lock is in "None", this means that a unit has not yet requested the lock, or - that the request has been completed. - - In more detail, here is the relation structure: - - relation.data: - : - status: 'acquire|release' - : - : 'granted|None' - - Note that this class makes no attempts to timestamp the locks and thus handle multiple - requests in a row. If a unit re-requests a lock before being granted the lock, the - lock will simply stay in the "acquire" state. If a unit wishes to clear its lock, it - simply needs to call lock.release(). - - """ - - def __init__(self, manager, unit=None): - - self.relation = manager.model.relations[manager.name][0] - if not self.relation: - # TODO: defer caller in this case (probably just fired too soon). - raise LockNoRelationError() - - self.unit = unit or manager.model.unit - self.app = manager.model.app - - @property - def _state(self) -> LockState: - """Return an appropriate state. - - Note that the state exists in the unit's relation data, and the application - relation data, so we have to be careful about what our states mean. - - Unit state can only be in "acquire", "release", "None" (None means unset) - Application state can only be in "granted" or "None" (None means unset or released) - - """ - unit_state = LockState(self.relation.data[self.unit].get("state", LockState.IDLE.value)) - app_state = LockState( - self.relation.data[self.app].get(str(self.unit), LockState.IDLE.value) - ) - - if app_state == LockState.GRANTED and unit_state == LockState.RELEASE: - # Active release request. - return LockState.RELEASE - - if app_state == LockState.IDLE and unit_state == LockState.ACQUIRE: - # Active acquire request. - return LockState.ACQUIRE - - return app_state # Granted or unset/released - - @_state.setter - def _state(self, state: LockState): - """Set the given state. - - Since we update the relation data, this may fire off a RelationChanged event. - """ - if state == LockState.ACQUIRE: - self.relation.data[self.unit].update({"state": state.value}) - - if state == LockState.RELEASE: - self.relation.data[self.unit].update({"state": state.value}) - - if state == LockState.GRANTED: - self.relation.data[self.app].update({str(self.unit): state.value}) - - if state is LockState.IDLE: - self.relation.data[self.app].update({str(self.unit): state.value}) - - def acquire(self): - """Request that a lock be acquired.""" - self._state = LockState.ACQUIRE - - def release(self): - """Request that a lock be released.""" - self._state = LockState.RELEASE - - def clear(self): - """Unset a lock.""" - self._state = LockState.IDLE - - def grant(self): - """Grant a lock to a unit.""" - self._state = LockState.GRANTED - - def is_held(self): - """This unit holds the lock.""" - return self._state == LockState.GRANTED - - def release_requested(self): - """A unit has reported that they are finished with the lock.""" - return self._state == LockState.RELEASE - - def is_pending(self): - """Is this unit waiting for a lock?""" - return self._state == LockState.ACQUIRE - - -class Locks: - """Generator that returns a list of locks.""" - - def __init__(self, manager): - self.manager = manager - - # Gather all the units. - relation = manager.model.relations[manager.name][0] - units = [unit for unit in relation.units] - - # Plus our unit ... - units.append(manager.model.unit) - - self.units = units - - def __iter__(self): - """Yields a lock for each unit we can find on the relation.""" - for unit in self.units: - yield Lock(self.manager, unit=unit) - - -class RunWithLock(EventBase): - """Event to signal that this unit should run the callback.""" - - pass - - -class AcquireLock(EventBase): - """Signals that this unit wants to acquire a lock.""" - - def __init__(self, handle, callback_override: Optional[str] = None): - super().__init__(handle) - self.callback_override = callback_override or "" - - def snapshot(self): - return {"callback_override": self.callback_override} - - def restore(self, snapshot): - self.callback_override = snapshot["callback_override"] - - -class ProcessLocks(EventBase): - """Used to tell the leader to process all locks.""" - - pass - - -class RollingOpsManager(Object): - """Emitters and handlers for rolling ops.""" - - def __init__(self, charm: CharmBase, relation: AnyStr, callback: Callable): - """Register our custom events. - - params: - charm: the charm we are attaching this to. - relation: an identifier, by convention based on the name of the relation in the - metadata.yaml, which identifies this instance of RollingOperatorsFactory, - distinct from other instances that may be hanlding other events. - callback: a closure to run when we have a lock. (It must take a CharmBase object and - EventBase object as args.) - """ - # "Inherit" from the charm's class. This gives us access to the framework as - # self.framework, as well as the self.model shortcut. - super().__init__(charm, None) - - self.name = relation - self._callback = callback - self.charm = charm # Maintain a reference to charm, so we can emit events. - - charm.on.define_event("{}_run_with_lock".format(self.name), RunWithLock) - charm.on.define_event("{}_acquire_lock".format(self.name), AcquireLock) - charm.on.define_event("{}_process_locks".format(self.name), ProcessLocks) - - # Watch those events (plus the built in relation event). - self.framework.observe(charm.on[self.name].relation_changed, self._on_relation_changed) - self.framework.observe(charm.on[self.name].acquire_lock, self._on_acquire_lock) - self.framework.observe(charm.on[self.name].run_with_lock, self._on_run_with_lock) - self.framework.observe(charm.on[self.name].process_locks, self._on_process_locks) - - def _callback(self: CharmBase, event: EventBase) -> None: - """Placeholder for the function that actually runs our event. - - Usually overridden in the init. - """ - raise NotImplementedError - - def _on_relation_changed(self: CharmBase, event: RelationChangedEvent): - """Process relation changed. - - First, determine whether this unit has been granted a lock. If so, emit a RunWithLock - event. - - Then, if we are the leader, fire off a process locks event. - - """ - lock = Lock(self) - - if lock.is_pending(): - self.model.unit.status = WaitingStatus("Awaiting {} operation".format(self.name)) - - if lock.is_held(): - self.charm.on[self.name].run_with_lock.emit() - - if self.model.unit.is_leader(): - self.charm.on[self.name].process_locks.emit() - - def _on_process_locks(self: CharmBase, event: ProcessLocks): - """Process locks. - - Runs only on the leader. Updates the status of all locks. - - """ - if not self.model.unit.is_leader(): - return - - pending = [] - - for lock in Locks(self): - if lock.is_held(): - # One of our units has the lock -- return without further processing. - return - - if lock.release_requested(): - lock.clear() # Updates relation data - - if lock.is_pending(): - if lock.unit == self.model.unit: - # Always run on the leader last. - pending.insert(0, lock) - else: - pending.append(lock) - - # If we reach this point, and we have pending units, we want to grant a lock to - # one of them. - if pending: - self.model.app.status = MaintenanceStatus("Beginning rolling {}".format(self.name)) - lock = pending[-1] - lock.grant() - if lock.unit == self.model.unit: - # It's time for the leader to run with lock. - self.charm.on[self.name].run_with_lock.emit() - return - - if self.model.app.status.message == f"Beginning rolling {self.name}": - self.model.app.status = ActiveStatus() - - def _on_acquire_lock(self: CharmBase, event: ActionEvent): - """Request a lock.""" - try: - Lock(self).acquire() # Updates relation data - # emit relation changed event in the edge case where aquire does not - relation = self.model.get_relation(self.name) - - # persist callback override for eventual run - relation.data[self.charm.unit].update({"callback_override": event.callback_override}) - self.charm.on[self.name].relation_changed.emit(relation, app=self.charm.app) - - except LockNoRelationError: - logger.debug("No {} peer relation yet. Delaying rolling op.".format(self.name)) - event.defer() - - def _on_run_with_lock(self: CharmBase, event: RunWithLock): - lock = Lock(self) - self.model.unit.status = MaintenanceStatus("Executing {} operation".format(self.name)) - relation = self.model.get_relation(self.name) - - # default to instance callback if not set - callback_name = relation.data[self.charm.unit].get( - "callback_override", self._callback.__name__ - ) - callback = getattr(self.charm, callback_name) - callback(event) - - lock.release() # Updates relation data - if lock.unit == self.model.unit: - self.charm.on[self.name].process_locks.emit() - - # cleanup old callback overrides - relation.data[self.charm.unit].update({"callback_override": ""}) - - if self.model.unit.status.message == f"Executing {self.name} operation": - self.model.unit.status = ActiveStatus() diff --git a/metadata.yaml b/metadata.yaml index a163f66e1..6b1c0bea9 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -13,8 +13,8 @@ summary: | peers: opensearch-peers: interface: opensearch_peers - service: - interface: rolling_op + node-lock-fallback: + interface: node_lock_fallback provides: peer-cluster-orchestrator: diff --git a/tests/unit/lib/test_backups.py b/tests/unit/lib/test_backups.py index e5a6824a3..e1b990720 100644 --- a/tests/unit/lib/test_backups.py +++ b/tests/unit/lib/test_backups.py @@ -70,7 +70,6 @@ def harness(): ) # Replace some unused methods that will be called as part of set_leader with mock - charm.service_manager._update_locks = MagicMock() charm._put_admin_user = MagicMock() harness_obj.add_relation(PeerRelationName, "opensearch") harness_obj.set_leader(is_leader=True) diff --git a/tests/unit/lib/test_helper_cluster.py b/tests/unit/lib/test_helper_cluster.py index 6f93dc13f..463fb805b 100644 --- a/tests/unit/lib/test_helper_cluster.py +++ b/tests/unit/lib/test_helper_cluster.py @@ -124,76 +124,77 @@ def setUp(self) -> None: self.opensearch = self.charm.opensearch - def test_topology_roles_suggestion_odd_number_of_planned_units(self): - """Test the suggestion of roles for a new node and odd numbers of planned units.""" - planned_units = 5 - cluster_5_conf = self.cluster1_5_nodes_conf() - - self.assertCountEqual(ClusterTopology.suggest_roles([], planned_units), self.cm_roles) - for start_index in range(1, 5): - self.assertCountEqual( - ClusterTopology.suggest_roles(cluster_5_conf[:start_index], planned_units), - self.cm_roles, - ) - - def test_topology_roles_suggestion_even_number_of_planned_units(self): - """Test the suggestion of roles for a new node and even numbers of planned units.""" - cluster_6_conf = self.cluster1_6_nodes_conf() - - planned_units = 6 - - self.assertCountEqual(ClusterTopology.suggest_roles([], planned_units), self.cm_roles) - for start_index in range(1, 5): - self.assertCountEqual( - ClusterTopology.suggest_roles(cluster_6_conf[:start_index], planned_units), - self.cm_roles, - ) - - self.assertCountEqual( - ClusterTopology.suggest_roles(cluster_6_conf[:-1], planned_units), - self.base_roles, - ) - - def test_auto_recompute_node_roles_in_cluster_6(self): - """Test the automatic suggestion of new roles to an existing node.""" - cluster_conf = {node.name: node for node in self.cluster1_6_nodes_conf()} - - # remove a cluster manager node - old_cluster_conf = cluster_conf.copy() - old_cluster_conf.pop("cm1") - new_cluster_conf = ClusterTopology.recompute_nodes_conf( - app_name=self.cluster1, nodes=list(old_cluster_conf.values()) - ) - assert new_cluster_conf["data1"].roles == self.cm_roles - # Assert other remaining nodes unchanged - old_cluster_conf.pop("data1") - new_cluster_conf.pop("data1") - assert old_cluster_conf == new_cluster_conf - - # remove a data node - old_cluster_conf = cluster_conf.copy() - old_cluster_conf.pop("data1") - new_cluster_conf = ClusterTopology.recompute_nodes_conf( - app_name=self.cluster1, nodes=list(old_cluster_conf.values()) - ) - # Assert all remaining nodes unchanged - assert old_cluster_conf == new_cluster_conf - - def test_auto_recompute_node_roles_in_cluster_5(self): - """Test the automatic suggestion of new roles to an existing node.""" - cluster_conf = {node.name: node for node in self.cluster1_5_nodes_conf()} - - # remove a cluster manager node - old_cluster_conf = cluster_conf.copy() - old_cluster_conf.pop("cm1") - new_cluster_conf = ClusterTopology.recompute_nodes_conf( - app_name=self.cluster1, nodes=list(old_cluster_conf.values()) - ) - assert new_cluster_conf["cm5"].roles == self.base_roles - # Assert other remaining nodes unchanged - old_cluster_conf.pop("cm5") - new_cluster_conf.pop("cm5") - assert old_cluster_conf == new_cluster_conf + # TODO: remove in https://github.com/canonical/opensearch-operator/issues/230 + # def test_topology_roles_suggestion_odd_number_of_planned_units(self): + # """Test the suggestion of roles for a new node and odd numbers of planned units.""" + # planned_units = 5 + # cluster_5_conf = self.cluster1_5_nodes_conf() + # + # self.assertCountEqual(ClusterTopology.suggest_roles([], planned_units), self.cm_roles) + # for start_index in range(1, 5): + # self.assertCountEqual( + # ClusterTopology.suggest_roles(cluster_5_conf[:start_index], planned_units), + # self.cm_roles, + # ) + # + # def test_topology_roles_suggestion_even_number_of_planned_units(self): + # """Test the suggestion of roles for a new node and even numbers of planned units.""" + # cluster_6_conf = self.cluster1_6_nodes_conf() + # + # planned_units = 6 + # + # self.assertCountEqual(ClusterTopology.suggest_roles([], planned_units), self.cm_roles) + # for start_index in range(1, 5): + # self.assertCountEqual( + # ClusterTopology.suggest_roles(cluster_6_conf[:start_index], planned_units), + # self.cm_roles, + # ) + # + # self.assertCountEqual( + # ClusterTopology.suggest_roles(cluster_6_conf[:-1], planned_units), + # self.base_roles, + # ) + # + # def test_auto_recompute_node_roles_in_cluster_6(self): + # """Test the automatic suggestion of new roles to an existing node.""" + # cluster_conf = {node.name: node for node in self.cluster1_6_nodes_conf()} + # + # # remove a cluster manager node + # old_cluster_conf = cluster_conf.copy() + # old_cluster_conf.pop("cm1") + # new_cluster_conf = ClusterTopology.recompute_nodes_conf( + # app_name=self.cluster1, nodes=list(old_cluster_conf.values()) + # ) + # assert new_cluster_conf["data1"].roles == self.cm_roles + # # Assert other remaining nodes unchanged + # old_cluster_conf.pop("data1") + # new_cluster_conf.pop("data1") + # assert old_cluster_conf == new_cluster_conf + # + # # remove a data node + # old_cluster_conf = cluster_conf.copy() + # old_cluster_conf.pop("data1") + # new_cluster_conf = ClusterTopology.recompute_nodes_conf( + # app_name=self.cluster1, nodes=list(old_cluster_conf.values()) + # ) + # # Assert all remaining nodes unchanged + # assert old_cluster_conf == new_cluster_conf + # + # def test_auto_recompute_node_roles_in_cluster_5(self): + # """Test the automatic suggestion of new roles to an existing node.""" + # cluster_conf = {node.name: node for node in self.cluster1_5_nodes_conf()} + # + # # remove a cluster manager node + # old_cluster_conf = cluster_conf.copy() + # old_cluster_conf.pop("cm1") + # new_cluster_conf = ClusterTopology.recompute_nodes_conf( + # app_name=self.cluster1, nodes=list(old_cluster_conf.values()) + # ) + # assert new_cluster_conf["cm5"].roles == self.base_roles + # # Assert other remaining nodes unchanged + # old_cluster_conf.pop("cm5") + # new_cluster_conf.pop("cm5") + # assert old_cluster_conf == new_cluster_conf def test_auto_recompute_node_roles_in_previous_non_auto_gen_cluster(self): """Test the automatic suggestion of new roles to an existing node.""" diff --git a/tests/unit/lib/test_ml_plugins.py b/tests/unit/lib/test_ml_plugins.py index ac26c77e4..f2d552c69 100644 --- a/tests/unit/lib/test_ml_plugins.py +++ b/tests/unit/lib/test_ml_plugins.py @@ -8,7 +8,6 @@ import charms from charms.opensearch.v0.opensearch_health import HealthColors from charms.opensearch.v0.opensearch_plugins import OpenSearchKnn, PluginState -from charms.rolling_ops.v0.rollingops import RollingOpsManager from ops.testing import Harness from charm import OpenSearchOperatorCharm @@ -63,7 +62,10 @@ def setUp(self) -> None: @patch( f"{BASE_LIB_PATH}.opensearch_peer_clusters.OpenSearchPeerClustersManager.deployment_desc" ) - @patch.object(RollingOpsManager, "_on_acquire_lock") + @patch( + "charms.opensearch.v0.opensearch_locking.OpenSearchNodeLock.acquired", + new_callable=PropertyMock, + ) @patch( "charms.opensearch.v0.opensearch_distro.OpenSearchDistribution.version", new_callable=PropertyMock, @@ -81,7 +83,7 @@ def test_disable_via_config_change( mock_status, mock_is_enabled, mock_version, - mock_acquire_lock, + mock_lock_acquired, ___, mock_is_node_up, ) -> None: @@ -98,9 +100,10 @@ def test_disable_via_config_change( mock_is_node_up.return_value = True self.charm._get_nodes = MagicMock(return_value=[1]) self.charm.planned_units = MagicMock(return_value=1) + mock_lock_acquired.return_value = False self.harness.update_config({"plugin_opensearch_knn": False}) - mock_acquire_lock.assert_called_once() + mock_lock_acquired.assert_called_once() self.plugin_manager._opensearch_config.add_plugin.assert_called_once_with( {"knn.plugin.enabled": "false"} ) diff --git a/tests/unit/lib/test_opensearch_base_charm.py b/tests/unit/lib/test_opensearch_base_charm.py index 917fd76e5..7265369d5 100644 --- a/tests/unit/lib/test_opensearch_base_charm.py +++ b/tests/unit/lib/test_opensearch_base_charm.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta from unittest.mock import MagicMock, patch +import charms.opensearch.v0.opensearch_locking as opensearch_locking from charms.opensearch.v0.constants_tls import CertType from charms.opensearch.v0.models import ( DeploymentDescription, @@ -18,7 +19,7 @@ StartMode, State, ) -from charms.opensearch.v0.opensearch_base_charm import SERVICE_MANAGER, PeerRelationName +from charms.opensearch.v0.opensearch_base_charm import PeerRelationName from charms.opensearch.v0.opensearch_exceptions import ( OpenSearchHttpError, OpenSearchInstallError, @@ -77,7 +78,9 @@ def setUp(self) -> None: self.peers_data = self.charm.peers_data self.rel_id = self.harness.add_relation(PeerRelationName, self.charm.app.name) - self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) + self.lock_fallback_rel_id = self.harness.add_relation( + opensearch_locking._PeerRelationLock._ENDPOINT_NAME, self.charm.app.name + ) self.OPENSEARCH_DISTRO = ( f"{self.opensearch.__class__.__module__}.{self.opensearch.__class__.__name__}" diff --git a/tests/unit/lib/test_opensearch_relation_provider.py b/tests/unit/lib/test_opensearch_relation_provider.py index 7f664312a..7dbcfc009 100644 --- a/tests/unit/lib/test_opensearch_relation_provider.py +++ b/tests/unit/lib/test_opensearch_relation_provider.py @@ -4,8 +4,8 @@ import unittest from unittest.mock import MagicMock, PropertyMock, patch +import charms.opensearch.v0.opensearch_locking as opensearch_locking from charms.opensearch.v0.constants_charm import ClientRelationName, PeerRelationName -from charms.opensearch.v0.opensearch_base_charm import SERVICE_MANAGER from charms.opensearch.v0.opensearch_internal_data import Scope from charms.opensearch.v0.opensearch_users import OpenSearchUserMgmtError from ops.model import ActiveStatus, BlockedStatus @@ -28,7 +28,9 @@ def setUp(self): self.opensearch_provider = self.charm.opensearch_provider self.peers_rel_id = self.harness.add_relation(PeerRelationName, self.charm.app.name) - self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) + self.lock_fallback_rel_id = self.harness.add_relation( + opensearch_locking._PeerRelationLock._ENDPOINT_NAME, self.charm.app.name + ) # Define an opensearch_provider relation self.client_rel_id = self.harness.add_relation(ClientRelationName, "application") diff --git a/tests/unit/lib/test_opensearch_secrets.py b/tests/unit/lib/test_opensearch_secrets.py index 77f6cfda8..88132d2d8 100644 --- a/tests/unit/lib/test_opensearch_secrets.py +++ b/tests/unit/lib/test_opensearch_secrets.py @@ -3,9 +3,9 @@ from unittest.mock import MagicMock, patch +import charms.opensearch.v0.opensearch_locking as opensearch_locking from charms.opensearch.v0.constants_charm import ClientRelationName, PeerRelationName from charms.opensearch.v0.constants_tls import CertType -from charms.opensearch.v0.opensearch_base_charm import SERVICE_MANAGER from charms.opensearch.v0.opensearch_internal_data import Scope from ops import JujuVersion from ops.testing import Harness @@ -43,7 +43,9 @@ def setUp(self): JujuVersion.from_environ = MagicMock(return_value=JujuVersionMock()) self.peers_rel_id = self.harness.add_relation(PeerRelationName, self.charm.app.name) - self.service_rel_id = self.harness.add_relation(SERVICE_MANAGER, self.charm.app.name) + self.lock_fallback_rel_id = self.harness.add_relation( + opensearch_locking._PeerRelationLock._ENDPOINT_NAME, self.charm.app.name + ) self.client_rel_id = self.harness.add_relation(ClientRelationName, "application") self.harness.add_relation_unit(self.client_rel_id, "application/0")