diff --git a/control/grpc.py b/control/grpc.py index aa9d9bcd..a42f904a 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -31,8 +31,9 @@ from .config import GatewayConfig from .discovery import DiscoveryService from .state import GatewayState, GatewayStateHandler, OmapLock +from collections import defaultdict -MAX_ANA_GROUPS = 32 # should match nvmeof gateway monitor ceph c++ code +MAX_ANA_GROUPS = 16 # should match nvmeof gateway monitor ceph c++ code class MonitorGroupService(monitor_pb2_grpc.MonitorGroupServicer): def __init__(self, set_group_id: Callable[[int], None]) -> None: @@ -90,6 +91,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, om self.group_id = group_id self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") + self.ana_map = defaultdict(dict) + if not self.gateway_name: self.gateway_name = socket.gethostname() self._init_cluster_context() @@ -358,7 +361,10 @@ def create_subsystem_safe(self, request, context=None): max_cntlid=max_cntlid, ana_reporting = request.ana_reporting, ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") + self.logger.info(f"create_subsystem and ana_map {request.subsystem_nqn}: {ret}") + for x in range (MAX_ANA_GROUPS): + self.ana_map[request.subsystem_nqn][x+1] = pb2.ana_state.INACCESSIBLE + except Exception as ex: self.logger.error(f"create_subsystem failed with: \n {ex}") if context: @@ -377,7 +383,8 @@ def create_subsystem_safe(self, request, context=None): self.logger.error(f"Error persisting create_subsystem" f" {request.subsystem_nqn}: {ex}") raise - + for x in range (MAX_ANA_GROUPS): + self.ana_map[request.subsystem_nqn][x+1] = pb2.ana_state.INACCESSIBLE return pb2.req_status(status=ret) def create_subsystem(self, request, context=None): @@ -414,7 +421,8 @@ def delete_subsystem_safe(self, request, context=None): self.logger.error(f"Error persisting delete_subsystem" f" {request.subsystem_nqn}: {ex}") raise - + if self.ana_map[request.subsystem_nqn]: + self.ana_map[request.subsystem_nqn].clear() return pb2.req_status(status=ret) def delete_subsystem(self, request, context=None): @@ -479,6 +487,9 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): prefix = f"{self.gateway_state.local.LISTENER_PREFIX}{nqn}_{self.gateway_name}_" listener_keys = [key for key in state.keys() if key.startswith(prefix)] self.logger.info(f"Iterate over {nqn=} {prefix=} {listener_keys=}") + # fill the static gateway dictinary per nqn and grp_id + for gs in nas.states: + self.ana_map[nqn][gs.grp_id] = gs.state for listener_key in listener_keys: listener = json.loads(state[listener_key]) @@ -764,12 +775,17 @@ def create_listener_safe(self, request, context=None): if enable_ha: for x in range (MAX_ANA_GROUPS): + _ana_state = "inaccessible" + if self.ana_map[request.nqn]: + _ana_state = "optimized" if self.ana_map[request.nqn][x+1] == pb2.ana_state.OPTIMIZED else "inaccessible" + self.logger.info(f"using ana_map: set listener on nqn : {request.nqn} ana state : {_ana_state} for group : {x+1}") + try: self.logger.info(f"create_listener nvmf_subsystem_listener_set_ana_state {request=} ana_state=inaccessible anagrpid={x+1}") ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( self.spdk_rpc_client, nqn=request.nqn, - ana_state="inaccessible", + ana_state= _ana_state, trtype=request.trtype, traddr=request.traddr, trsvcid=request.trsvcid,