Skip to content

Commit

Permalink
Use OpenSearch for locking with fallback to peer databag (#211)
Browse files Browse the repository at this point in the history
## Issue
### Fix issues with current lock implementation
- Fixes #183: Units can start/restart without rollingops lock (if event
deferred, lock released but unit (re)starts on next event anyways).
- Unit can scale down (using "ops lock") while another unit is
(re)starting (using separate rollingops lock)

### Prepare for in-place upgrades
- In order to maintain HA, only one unit should start, restart, join
cluster (scale up), leave cluster (scale down), or upgrade at a time
- Upgrades adds an additional challenge: if the charm code is upgraded
(in addition to the workload), it's possible for the leader unit to go
into error state—which prevents coordination of locks via peer databag
(current rolling ops implementation) and can block rollback (for other
units)

## Options considered
1. Use peer relation databag for lock
- Unit requests lock by adding data to unit databag -> relation-changed
event triggered on leader
- Leader grants lock by adding data to app databag -> relation-changed
event triggered on unit
    - Unit proceeds & releases lock from unit databag
2. Don't use lock for upgrades. Make upgrade order deterministic (so
that each unit can independently determine upgrade order & upgrade order
does not change) and each unit upgrades when it sees the units before it
have upgraded
3. Use opensearch index/document for lock
4. Option 3 but fallback to option 1 if ~~no units online~~ less than 2
units online

Cons of each option:
1. if leader unit is in error state (raises uncaught exception),
rollback will be blocked for all units
2. a unit can restart for non-upgrade related reasons (role re-balancing
from network cut or scale up) while another unit is restarting to
upgrade
3. doesn't work if all units offline (initial start, full cluster
crash/network cut)
4. implementation complexity, some edge cases not supported e.g.
https://github.com/canonical/opensearch-operator/blob/b1b28c10f2bd70bee4270707b616d8d565b8b616/lib/charms/opensearch/v0/opensearch_locking.py#L227-L228

Pros of each option:
1. only one unit will (re)start at a time, for any reason
2. rollback with `juju refresh` will immediately rollback highest unit
even if leader/other units in error state
3. only one unit will (re)start at a time, for any reason and rollback
with `juju refresh` will quickly rollback highest unit even if leader
unit charm code in error state
4. same as 3 and locking (mostly, except aforementioned edge cases)
works when all units offline

More context:
Discussion:
https://chat.canonical.com/canonical/pl/9fah5tfxd38ybnx3tq7zugxfyh
Option 1 in discussion is option 2 here, option 2 in discussion is
option 1 here

**Option chosen:** Option 4

### Opensearch index vs document for lock
Current "ops lock" implementation with opensearch index:
Each unit requests the lock by trying to create an index. If the index
does not exist, the "lock" is granted.

However, if a unit requests the lock, charm goes into error state, and
error state is resolved (e.g. after rollback) it will not be able to use
the lock—no unit will be aware that it has the lock and no unit will be
able to release the lock

Solution: use document id 0 that stores "unit-name" as lock
Discussion:
https://chat.canonical.com/canonical/pl/biddxzzk3fbpjgbhmatzr8n6bw

## Solution
### Design
(Option 4): Use opensearch document as lock (for any (re)start, join
cluster, leave cluster, or upgrade). Fallback to peer databag if all
units offline.

### Implementation
Create custom events `_StartOpensearch` and `_RestartOpensearch`

https://github.com/canonical/opensearch-operator/blob/b1b28c10f2bd70bee4270707b616d8d565b8b616/lib/charms/opensearch/v0/opensearch_base_charm.py#L121-L132

When opensearch should be (re)started, emit the custom event.
Custom event requests the lock. If granted, it (re)starts opensearch.
Once opensearch fully ready, the lock is released.
If opensearch fails to start, the lock is released.
While opensearch is starting or while the lock is not granted, the
custom event will be continually deferred.

Note: the original event is not deferred—only the custom event. This is
so that any logic that ran before the request to (re)start opensearch
does not get re-ran.

By requesting the lock within the custom event, and attempting to
reacquire the lock each time the custom event runs (i.e. after every
time it's deferred), we solve the design issue with rollingops and
deferred events detailed in #183
  • Loading branch information
carlcsaposs-canonical authored Apr 17, 2024
1 parent a055793 commit 15c7368
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 637 deletions.
23 changes: 5 additions & 18 deletions lib/charms/opensearch/v0/helper_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,8 @@ def suggest_roles(nodes: List[Node], planned_units: int) -> List[str]:
— odd: "all" the nodes are cm_eligible nodes.
— even: "all - 1" are cm_eligible and 1 data node.
"""
max_cms = ClusterTopology.max_cluster_manager_nodes(planned_units)

base_roles = ["data", "ingest", "ml", "coordinating_only"]
full_roles = base_roles + ["cluster_manager"]
nodes_by_roles = ClusterTopology.nodes_count_by_role(nodes)
if nodes_by_roles.get("cluster_manager", 0) == max_cms:
return base_roles
return full_roles
# TODO: remove in https://github.com/canonical/opensearch-operator/issues/230
return ["data", "ingest", "ml", "coordinating_only", "cluster_manager"]

@staticmethod
def get_cluster_settings(
Expand All @@ -74,6 +68,7 @@ def get_cluster_settings(

@staticmethod
def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]:
# TODO: remove in https://github.com/canonical/opensearch-operator/issues/230
"""Recompute the configuration of all the nodes (cluster set to auto-generate roles)."""
if not nodes:
return {}
Expand All @@ -86,19 +81,11 @@ def recompute_nodes_conf(app_name: str, nodes: List[Node]) -> Dict[str, Node]:
else:
# Leave node unchanged
nodes_by_name[node.name] = node
base_roles = ["data", "ingest", "ml", "coordinating_only"]
full_roles = base_roles + ["cluster_manager"]
highest_unit_number = max(node.unit_number for node in current_cluster_nodes)
for node in current_cluster_nodes:
# we do this in order to remove any non-default role / add any missing default role
if len(current_cluster_nodes) % 2 == 0 and node.unit_number == highest_unit_number:
roles = base_roles
else:
roles = full_roles

nodes_by_name[node.name] = Node(
name=node.name,
roles=roles,
# we do this in order to remove any non-default role / add any missing default role
roles=["data", "ingest", "ml", "coordinating_only", "cluster_manager"],
ip=node.ip,
app_name=node.app_name,
unit_number=node.unit_number,
Expand Down
110 changes: 61 additions & 49 deletions lib/charms/opensearch/v0/opensearch_base_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from charms.opensearch.v0.opensearch_fixes import OpenSearchFixes
from charms.opensearch.v0.opensearch_health import HealthColors, OpenSearchHealth
from charms.opensearch.v0.opensearch_internal_data import RelationDataStore, Scope
from charms.opensearch.v0.opensearch_locking import OpenSearchOpsLock
from charms.opensearch.v0.opensearch_locking import OpenSearchNodeLock
from charms.opensearch.v0.opensearch_nodes_exclusions import (
ALLOCS_TO_DELETE,
VOTING_TO_DELETE,
Expand All @@ -87,7 +87,6 @@
from charms.opensearch.v0.opensearch_secrets import OpenSearchSecrets
from charms.opensearch.v0.opensearch_tls import OpenSearchTLS
from charms.opensearch.v0.opensearch_users import OpenSearchUserManager
from charms.rolling_ops.v0.rollingops import RollingOpsManager
from charms.tls_certificates_interface.v3.tls_certificates import (
CertificateAvailableEvent,
)
Expand All @@ -105,7 +104,7 @@
StorageDetachingEvent,
UpdateStatusEvent,
)
from ops.framework import EventBase
from ops.framework import EventBase, EventSource
from ops.model import BlockedStatus, MaintenanceStatus, WaitingStatus
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

Expand All @@ -127,9 +126,26 @@
logger = logging.getLogger(__name__)


class _StartOpenSearch(EventBase):
"""Attempt to acquire lock & start OpenSearch.
This event will be deferred until OpenSearch starts.
"""


class _RestartOpenSearch(EventBase):
"""Attempt to acquire lock & restart OpenSearch.
This event will be deferred until OpenSearch stops. Then, `_StartOpenSearch` will be emitted.
"""


class OpenSearchBaseCharm(CharmBase):
"""Base class for OpenSearch charms."""

_start_opensearch_event = EventSource(_StartOpenSearch)
_restart_opensearch_event = EventSource(_RestartOpenSearch)

def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
super().__init__(*args)

Expand All @@ -147,7 +163,7 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
self.tls = OpenSearchTLS(self, TLS_RELATION)
self.status = Status(self)
self.health = OpenSearchHealth(self)
self.ops_lock = OpenSearchOpsLock(self)
self.node_lock = OpenSearchNodeLock(self)
self.cos_integration = COSAgentProvider(
self,
relation_name=COSRelationName,
Expand All @@ -161,14 +177,14 @@ def __init__(self, *args, distro: Type[OpenSearchDistribution] = None):
self.plugin_manager = OpenSearchPluginManager(self)
self.backup = OpenSearchBackup(self)

self.service_manager = RollingOpsManager(
self, relation=SERVICE_MANAGER, callback=self._start_opensearch
)
self.user_manager = OpenSearchUserManager(self)
self.opensearch_provider = OpenSearchProvider(self)
self.peer_cluster_provider = OpenSearchPeerClusterProvider(self)
self.peer_cluster_requirer = OpenSearchPeerClusterRequirer(self)

self.framework.observe(self._start_opensearch_event, self._start_opensearch)
self.framework.observe(self._restart_opensearch_event, self._restart_opensearch)

self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.update_status, self._on_update_status)
Expand Down Expand Up @@ -271,7 +287,7 @@ def _on_start(self, event: StartEvent):

# request the start of OpenSearch
self.status.set(WaitingStatus(RequestUnitServiceOps.format("start")))
self.on[self.service_manager.name].acquire_lock.emit(callback_override="_start_opensearch")
self._start_opensearch_event.emit()

def _apply_peer_cm_directives_and_check_if_can_start(self) -> bool:
"""Apply the directives computed by the opensearch peer cluster manager."""
Expand Down Expand Up @@ -404,7 +420,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent):
def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # noqa: C901
"""Triggered when removing unit, Prior to the storage being detached."""
# acquire lock to ensure only 1 unit removed at a time
self.ops_lock.acquire()
if not self.node_lock.acquired:
# Raise uncaught exception to prevent Juju from removing unit
raise Exception("Unable to acquire lock: Another unit is starting or stopping.")

# if the leader is departing, and this hook fails "leader elected" won"t trigger,
# so we want to re-balance the node roles from here
Expand Down Expand Up @@ -446,7 +464,7 @@ def _on_opensearch_data_storage_detaching(self, _: StorageDetachingEvent): # no
raise OpenSearchHAError(ClusterHealthUnknown)
finally:
# release lock
self.ops_lock.release()
self.node_lock.release()

def _on_update_status(self, event: UpdateStatusEvent):
"""On update status event.
Expand Down Expand Up @@ -524,9 +542,7 @@ def _on_config_changed(self, event: ConfigChangedEvent): # noqa C901
if self.unit.is_leader():
self.status.set(MaintenanceStatus(PluginConfigCheck), app=True)
if self.plugin_manager.run():
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()
except (OpenSearchNotFullyReadyError, OpenSearchPluginError) as e:
if isinstance(e, OpenSearchNotFullyReadyError):
logger.warning("Plugin management: cluster not ready yet at config changed")
Expand Down Expand Up @@ -618,9 +634,7 @@ def on_tls_conf_set(

# In case of renewal of the unit transport layer cert - restart opensearch
if renewal and self.is_admin_user_configured() and self.is_tls_fully_configured():
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()

def on_tls_relation_broken(self, _: RelationBrokenEvent):
"""As long as all certificates are produced, we don't do anything."""
Expand Down Expand Up @@ -699,8 +713,12 @@ def _handle_change_to_main_orchestrator_if_needed(

self.tls.request_new_admin_certificate()

def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
def _start_opensearch(self, event: _StartOpenSearch) -> None: # noqa: C901
"""Start OpenSearch, with a generated or passed conf, if all resources configured."""
if not self.node_lock.acquired:
logger.debug("Lock to start opensearch not acquired. Will retry next event")
event.defer()
return
self.peers_data.delete(Scope.UNIT, "started")
if self.opensearch.is_started():
try:
Expand All @@ -710,25 +728,18 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
return

if not self._can_service_start():
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
return

if self.peers_data.get(Scope.UNIT, "starting", False) and self.opensearch.is_failed():
self.peers_data.delete(Scope.UNIT, "starting")
if self.opensearch.is_failed():
self.node_lock.release()
self.status.set(BlockedStatus(ServiceStartError))
event.defer()
return

self.unit.status = WaitingStatus(WaitingToStart)

rel = self.model.get_relation(PeerRelationName)
for unit in rel.units.union({self.unit}):
if rel.data[unit].get("starting") == "True":
event.defer()
return

self.peers_data.put(Scope.UNIT, "starting", True)

try:
# Retrieve the nodes of the cluster, needed to configure this node
nodes = self._get_nodes(False)
Expand All @@ -739,12 +750,12 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
# Set the configuration of the node
self._set_node_conf(nodes)
except OpenSearchHttpError:
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
return
except OpenSearchProvidedRolesException as e:
logger.exception(e)
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
event.defer()
self.unit.status = BlockedStatus(str(e))
return
Expand All @@ -761,7 +772,7 @@ def _start_opensearch(self, event: EventBase) -> None: # noqa: C901
event.defer()
except OpenSearchStartError as e:
logger.exception(e)
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()
self.status.set(BlockedStatus(ServiceStartError))
event.defer()

Expand All @@ -788,8 +799,8 @@ def _post_start_init(self, event: EventBase):
# Remove the exclusions that could not be removed when no units were online
self.opensearch_exclusions.delete_current()

# Remove the 'starting' flag on the unit
self.peers_data.delete(Scope.UNIT, "starting")
self.node_lock.release()

self.peers_data.put(Scope.UNIT, "started", True)

# apply post_start fixes to resolve start related upstream bugs
Expand Down Expand Up @@ -827,18 +838,23 @@ def _stop_opensearch(self) -> None:
# 3. Remove the exclusions
self.opensearch_exclusions.delete_current()

def _restart_opensearch(self, event: EventBase) -> None:
def _restart_opensearch(self, event: _RestartOpenSearch) -> None:
"""Restart OpenSearch if possible."""
if not self.peers_data.get(Scope.UNIT, "starting", False):
try:
self._stop_opensearch()
except OpenSearchStopError as e:
logger.exception(e)
event.defer()
self.status.set(WaitingStatus(ServiceIsStopping))
return
if not self.node_lock.acquired:
logger.debug("Lock to restart opensearch not acquired. Will retry next event")
event.defer()
return

self._start_opensearch(event)
try:
self._stop_opensearch()
except OpenSearchStopError as e:
logger.exception(e)
self.node_lock.release()
event.defer()
self.status.set(WaitingStatus(ServiceIsStopping))
return

self._start_opensearch_event.emit()

def _can_service_start(self) -> bool:
"""Return if the opensearch service can start."""
Expand Down Expand Up @@ -917,9 +933,7 @@ def _remove_data_role_from_dedicated_cm_if_needed( # noqa: C901
return False

self.status.set(WaitingStatus(WaitingToStart))
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()
return True

def _purge_users(self):
Expand Down Expand Up @@ -1148,9 +1162,7 @@ def _reconfigure_and_restart_unit_if_needed(self):
return

self.status.set(WaitingStatus(WaitingToStart))
self.on[self.service_manager.name].acquire_lock.emit(
callback_override="_restart_opensearch"
)
self._restart_opensearch_event.emit()

def _recompute_roles_if_needed(self, event: RelationChangedEvent):
"""Recompute node roles:self-healing that didn't trigger leader related event occurred."""
Expand Down
4 changes: 0 additions & 4 deletions lib/charms/opensearch/v0/opensearch_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ class OpenSearchNotFullyReadyError(OpenSearchError):
"""Exception thrown when a node is started but not full ready to take on requests."""


class OpenSearchOpsLockAlreadyAcquiredError(OpenSearchError):
"""Exception thrown when a node is started but not full ready to take on requests."""


class OpenSearchCmdError(OpenSearchError):
"""Exception thrown when an OpenSearch bin command fails."""

Expand Down
Loading

0 comments on commit 15c7368

Please sign in to comment.