diff --git a/control/cephutils.py b/control/cephutils.py index ded2a424..a58fe05d 100644 --- a/control/cephutils.py +++ b/control/cephutils.py @@ -25,6 +25,7 @@ def __init__(self, config): self.anagroup_list = [] self.rebalance_supported = False self.rebalance_ana_group = 0 + self.num_gws = 0 self.last_sent = time.time() def execute_ceph_monitor_command(self, cmd): @@ -58,9 +59,12 @@ def is_rebalance_supported(self): def get_rebalance_ana_group(self): return self.rebalance_ana_group - def get_number_created_gateways(self, pool, group): + def get_num_gws(self): + return self.num_gws + + def get_number_created_gateways(self, pool, group, caching=True): now = time.time() - if (now - self.last_sent) < 10 and self.anagroup_list: + if caching and ((now - self.last_sent) < 10) and self.anagroup_list: self.logger.info(f"Caching response of the monitor: {self.anagroup_list}") return self.anagroup_list else: @@ -77,7 +81,9 @@ def get_number_created_gateways(self, pool, group): data = json.loads(conv_str) self.rebalance_supported = True self.rebalance_ana_group = data.get("rebalance_ana_group", None) - self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}") + self.num_gws = data.get("num gws", None) + self.logger.info(f"Rebalance ana_group: {self.rebalance_ana_group},\ + num-gws: {self.num_gws} ") else: self.rebalance_supported = False pos = conv_str.find("[") diff --git a/control/grpc.py b/control/grpc.py index a41fb240..a508125e 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -1908,7 +1908,7 @@ def namespace_change_load_balancing_group_safe(self, request, context): # the local rebalance logic. if context: grps_list = self.ceph_utils.get_number_created_gateways( - self.gateway_pool, self.gateway_group) + self.gateway_pool, self.gateway_group, False) if request.anagrpid not in grps_list: self.logger.debug(f"Load balancing groups: {grps_list}") errmsg = f"{change_lb_group_failure_prefix}: Load balancing group " \ diff --git a/control/rebalance.py b/control/rebalance.py index 840ad4b1..c1d6f9fe 100755 --- a/control/rebalance.py +++ b/control/rebalance.py @@ -30,6 +30,7 @@ def __init__(self, gateway_service): "gateway", "max_ns_to_change_lb_grp", 8) + self.last_scale_down_ts = time.time() self.rebalance_event = threading.Event() self.logger.info(f" Starting rebalance thread: period: {self.rebalance_period_sec}," f" max number ns to move: {self.rebalance_max_ns_to_change_lb_grp}") @@ -102,12 +103,13 @@ def find_min_loaded_group_in_subsys(self, nqn, grp_list) -> int: # and reballance results will be accurate. Monitor in nvme-gw show response publishes the # index of ANA group that is currently responsible for rebalance def rebalance_logic(self, request, context) -> int: + now = time.time() worker_ana_group = self.ceph_utils.get_rebalance_ana_group() self.logger.debug(f"Called rebalance logic: current rebalancing ana " f"group {worker_ana_group}") ongoing_scale_down_rebalance = False grps_list = self.ceph_utils.get_number_created_gateways(self.gw_srv.gateway_pool, - self.gw_srv.gateway_group) + self.gw_srv.gateway_group, False) if not self.ceph_utils.is_rebalance_supported(): self.logger.info("Auto rebalance is not supported with the curent ceph version") return 1 @@ -119,6 +121,7 @@ def rebalance_logic(self, request, context) -> int: ongoing_scale_down_rebalance = True self.logger.info(f"Scale-down rebalance is ongoing for ANA group {ana_grp} " f"current load {self.gw_srv.ana_grp_ns_load[ana_grp]}") + self.last_scale_down_ts = now break num_active_ana_groups = len(grps_list) for ana_grp in self.gw_srv.ana_grp_state: @@ -144,8 +147,11 @@ def rebalance_logic(self, request, context) -> int: f"GW still appears Optimized") return 1 else: - if not ongoing_scale_down_rebalance and \ - (self.gw_srv.ana_grp_state[worker_ana_group] == pb2.ana_state.OPTIMIZED): + # keep hysteresis interval between scale-down and regular rebalance + hysteresis = 2.5 * self.rebalance_period_sec + if not ongoing_scale_down_rebalance \ + and ((now - self.last_scale_down_ts) > hysteresis) \ + and (self.gw_srv.ana_grp_state[worker_ana_group] == pb2.ana_state.OPTIMIZED): # if my optimized ana group == worker-ana-group or worker-ana-group is # also in optimized state on this GW machine @@ -182,6 +188,17 @@ def rebalance_logic(self, request, context) -> int: f"{min_ana_grp}, load {min_load} does not " f"fit rebalance criteria!") continue + if ongoing_scale_down_rebalance and (num_active_ana_groups == self.ceph_utils.num_gws): + # this GW feels scale_down condition on ana_grp but no GW in Deleting + # state in the current mon.map . Experimental code - just for logs + self.logger.info(f"Seems like scale-down deadlock on group {ana_grp}") + if (self.gw_srv.ana_grp_state[worker_ana_group]) == pb2.ana_state.OPTIMIZED: + min_ana_grp, chosen_nqn = self.find_min_loaded_group(grps_list) + if chosen_nqn != "null": + self.logger.info(f"Start rebalance (deadlock resolving) dest. ana group" + f" {min_ana_grp}, subsystem {chosen_nqn}") + # self.ns_rebalance(context, ana_grp, min_ana_grp, 1, "0") + return 0 return 1 def ns_rebalance(self, context, ana_id, dest_ana_id, num, subs_nqn) -> int: