Skip to content

Commit

Permalink
Merge pull request #1024 from leonidc/disable_rebalance_caching_nvme_…
Browse files Browse the repository at this point in the history
…show

fix rebalance deadlock
  • Loading branch information
leonidc authored Jan 13, 2025
2 parents 5bc26c0 + cf65f12 commit e7153ce
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
12 changes: 9 additions & 3 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, config):
self.anagroup_list = []
self.rebalance_supported = False
self.rebalance_ana_group = 0
self.num_gws = 0
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
Expand Down Expand Up @@ -58,9 +59,12 @@ def is_rebalance_supported(self):
def get_rebalance_ana_group(self):
return self.rebalance_ana_group

def get_number_created_gateways(self, pool, group):
def get_num_gws(self):
return self.num_gws

def get_number_created_gateways(self, pool, group, caching=True):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list:
if caching and ((now - self.last_sent) < 10) and self.anagroup_list:
self.logger.info(f"Caching response of the monitor: {self.anagroup_list}")
return self.anagroup_list
else:
Expand All @@ -77,7 +81,9 @@ def get_number_created_gateways(self, pool, group):
data = json.loads(conv_str)
self.rebalance_supported = True
self.rebalance_ana_group = data.get("rebalance_ana_group", None)
self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}")
self.num_gws = data.get("num gws", None)
self.logger.info(f"Rebalance ana_group: {self.rebalance_ana_group},\
num-gws: {self.num_gws} ")
else:
self.rebalance_supported = False
pos = conv_str.find("[")
Expand Down
2 changes: 1 addition & 1 deletion control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1908,7 +1908,7 @@ def namespace_change_load_balancing_group_safe(self, request, context):
# the local rebalance logic.
if context:
grps_list = self.ceph_utils.get_number_created_gateways(
self.gateway_pool, self.gateway_group)
self.gateway_pool, self.gateway_group, False)
if request.anagrpid not in grps_list:
self.logger.debug(f"Load balancing groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group " \
Expand Down
23 changes: 20 additions & 3 deletions control/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, gateway_service):
"gateway",
"max_ns_to_change_lb_grp",
8)
self.last_scale_down_ts = time.time()
self.rebalance_event = threading.Event()
self.logger.info(f" Starting rebalance thread: period: {self.rebalance_period_sec},"
f" max number ns to move: {self.rebalance_max_ns_to_change_lb_grp}")
Expand Down Expand Up @@ -102,12 +103,13 @@ def find_min_loaded_group_in_subsys(self, nqn, grp_list) -> int:
# and reballance results will be accurate. Monitor in nvme-gw show response publishes the
# index of ANA group that is currently responsible for rebalance
def rebalance_logic(self, request, context) -> int:
now = time.time()
worker_ana_group = self.ceph_utils.get_rebalance_ana_group()
self.logger.debug(f"Called rebalance logic: current rebalancing ana "
f"group {worker_ana_group}")
ongoing_scale_down_rebalance = False
grps_list = self.ceph_utils.get_number_created_gateways(self.gw_srv.gateway_pool,
self.gw_srv.gateway_group)
self.gw_srv.gateway_group, False)
if not self.ceph_utils.is_rebalance_supported():
self.logger.info("Auto rebalance is not supported with the curent ceph version")
return 1
Expand All @@ -119,6 +121,7 @@ def rebalance_logic(self, request, context) -> int:
ongoing_scale_down_rebalance = True
self.logger.info(f"Scale-down rebalance is ongoing for ANA group {ana_grp} "
f"current load {self.gw_srv.ana_grp_ns_load[ana_grp]}")
self.last_scale_down_ts = now
break
num_active_ana_groups = len(grps_list)
for ana_grp in self.gw_srv.ana_grp_state:
Expand All @@ -144,8 +147,11 @@ def rebalance_logic(self, request, context) -> int:
f"GW still appears Optimized")
return 1
else:
if not ongoing_scale_down_rebalance and \
(self.gw_srv.ana_grp_state[worker_ana_group] == pb2.ana_state.OPTIMIZED):
# keep hysteresis interval between scale-down and regular rebalance
hysteresis = 2.5 * self.rebalance_period_sec
if not ongoing_scale_down_rebalance \
and ((now - self.last_scale_down_ts) > hysteresis) \
and (self.gw_srv.ana_grp_state[worker_ana_group] == pb2.ana_state.OPTIMIZED):
# if my optimized ana group == worker-ana-group or worker-ana-group is
# also in optimized state on this GW machine

Expand Down Expand Up @@ -182,6 +188,17 @@ def rebalance_logic(self, request, context) -> int:
f"{min_ana_grp}, load {min_load} does not "
f"fit rebalance criteria!")
continue
if ongoing_scale_down_rebalance and (num_active_ana_groups == self.ceph_utils.num_gws):
# this GW feels scale_down condition on ana_grp but no GW in Deleting
# state in the current mon.map . Experimental code - just for logs
self.logger.info(f"Seems like scale-down deadlock on group {ana_grp}")
if (self.gw_srv.ana_grp_state[worker_ana_group]) == pb2.ana_state.OPTIMIZED:
min_ana_grp, chosen_nqn = self.find_min_loaded_group(grps_list)
if chosen_nqn != "null":
self.logger.info(f"Start rebalance (deadlock resolving) dest. ana group"
f" {min_ana_grp}, subsystem {chosen_nqn}")
# self.ns_rebalance(context, ana_grp, min_ana_grp, 1, "0")
return 0
return 1

def ns_rebalance(self, context, ana_id, dest_ana_id, num, subs_nqn) -> int:
Expand Down

0 comments on commit e7153ce

Please sign in to comment.