From 1830497fbf4dd51a0b7aa260c7d957194df1b63d Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Wed, 10 Jul 2024 17:24:31 +0300 Subject: [PATCH] Change namespace balancing group using a direct SPDK call. Fixes #754 Signed-off-by: Gil Bregman --- control/cli.py | 6 +- control/grpc.py | 187 +++++++++++----------------------------------- control/server.py | 7 ++ control/state.py | 31 +++++++- tests/test_cli.py | 2 - 5 files changed, 83 insertions(+), 150 deletions(-) diff --git a/control/cli.py b/control/cli.py index d82f385bd..3c23448e2 100644 --- a/control/cli.py +++ b/control/cli.py @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args): """Change namespace load balancing group.""" out_func, err_func = self.get_output_functions(args) - if args.nsid == None and args.uuid == None: - self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command") - if args.nsid != None and args.nsid <= 0: + if args.nsid == None: + self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command") + if args.nsid <= 0: self.cli.parser.error("nsid value must be positive") if args.load_balancing_group <= 0: self.cli.parser.error("load-balancing-group value must be positive") diff --git a/control/grpc.py b/control/grpc.py index d631a19bd..931127a52 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -1026,160 +1026,56 @@ def namespace_change_load_balancing_group_safe(self, request, context): grps_list = [] peer_msg = self.get_peer_message(context) - nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") + change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}" + self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") + + if not request.nsid: + errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given" + self.logger.error(errmsg) + return pb2.req_status(status=errno.ENODEV, error_message=errmsg) grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) omap_lock = self.omap_lock.get_omap_lock_to_use(context) with omap_lock: if request.anagrpid not in grps_list: self.logger.debug(f"ANA groups: {grps_list}") - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist" + errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist" self.logger.error(errmsg) return pb2.req_status(status=errno.ENODEV, error_message=errmsg) - ns_entry = None - state = self.gateway_state.local.get_state() - if request.nsid: - ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid) - try: - state_ns = state[ns_key] - ns_entry = json.loads(state_ns) - assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}' - assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}' - except Exception as ex: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state" - self.logger.error(errmsg) - errmsg = f"{errmsg}:\n{ex}" - return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - elif request.uuid: - for key, val in state.items(): - if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX): - continue - try: - ns = json.loads(val) - if ns["uuid"] == request.uuid: - ns_entry = ns - break - except Exception: - self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces") - pass - else: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified" - self.logger.error(errmsg) - return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - - if not ns_entry: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state" - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - - if not request.nsid: - try: - request.nsid = ns_entry["nsid"] - except Exception: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID" - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - - create_image = False - ns_size = None - force = False - try: - create_image = ns_entry["create_image"] - ns_size = ns_entry["size"] - force = ns_entry["force"] - except Exception: - self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.") - - ns_qos_entry = None - ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid) - try: - state_ns_qos = state[ns_qos_key] - ns_qos_entry = json.loads(state_ns_qos) - except Exception: - self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}") - if ns_qos_entry: - try: - if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn: - errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS' - self.logger.error(errmsg) - ns_qos_entry = None - if ns_qos_entry["nsid"] != request.nsid: - errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS' - self.logger.error(errmsg) - ns_qos_entry = None - except Exception: - self.logger.error(f"Error processing QOS limits, will not set QOS") - ns_qos_entry = None - - set_qos_req = None - if ns_qos_entry: - set_qos_req = pb2.namespace_set_qos_req() - set_qos_req.subsystem_nqn = request.subsystem_nqn - set_qos_req.nsid = request.nsid - if request.uuid: - set_qos_req.uuid = request.uuid - try: - set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second") - try: - set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second") - try: - set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second") - try: - set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second") - - namespace_add_req = pb2.namespace_add_req() - namespace_add_req.subsystem_nqn = request.subsystem_nqn - namespace_add_req.nsid = request.nsid - if request.uuid: - namespace_add_req.uuid = request.uuid - namespace_add_req.anagrpid = request.anagrpid - namespace_add_req.create_image = create_image - if ns_size: - namespace_add_req.size = int(ns_size) - namespace_add_req.force = force - errmsg = None - try: - namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name" - try: - namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name" try: - namespace_add_req.block_size=ns_entry["block_size"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size" - - if errmsg: - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENODEV, error_message=errmsg) - - del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid) - ret_del = self.namespace_delete_safe(del_req, context) - if ret_del.status != 0: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}" - self.logger.error(errmsg) - return pb2.req_status(status=ret_del.status, error_message=errmsg) + ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + anagrpid=request.anagrpid + ) + self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}") + except Exception as ex: + errmsg = f"{change_lb_group_failure_prefix}:\n{ex}" + resp = self.parse_json_exeption(ex) + status = errno.EINVAL + if resp: + status = resp["code"] + errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}" + return pb2.req_status(status=status, error_message=errmsg) - ret_ns = self.namespace_add_safe(namespace_add_req, context) - if ret_ns.status != 0: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}" - self.logger.error(errmsg) - return pb2.req_status(status=ret_ns.status, error_message=errmsg) + # Just in case SPDK failed with no exception + if not ret: + self.logger.error(change_lb_group_failure_prefix) + return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix) - if set_qos_req: - ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context) + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True, including_default_value_fields=True) + self.gateway_state.add_namespace_lb_group(request.subsystem_nqn, request.nsid, json_req) + except Exception as ex: + errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}" + self.logger.exception(errmsg) + errmsg = f"{errmsg}:\n{ex}" + return pb2.req_status(status=errno.EINVAL, error_message=errmsg) return pb2.req_status(status=0, error_message=os.strerror(0)) @@ -1197,8 +1093,11 @@ def remove_namespace_from_state(self, nqn, nsid, context): # Update gateway state try: self.gateway_state.remove_namespace_qos(nqn, str(nsid)) - except Exception: - self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set") + except Exception as ex: + pass + try: + self.gateway_state.remove_namespace_lb_group(nqn, str(nsid)) + except Exception as ex: pass try: self.gateway_state.remove_namespace(nqn, str(nsid)) diff --git a/control/server.py b/control/server.py index 6e246bf47..e904d2a17 100644 --- a/control/server.py +++ b/control/server.py @@ -567,6 +567,13 @@ def gateway_rpc_caller(self, requests, is_add_req): else: # Do nothing, this is covered by the delete namespace code pass + elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX): + if is_add_req: + req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True) + self.gateway_rpc.namespace_change_load_balancing_group(req) + else: + # Do nothing, this is covered by the delete namespace code + pass elif key.startswith(GatewayState.HOST_PREFIX): if is_add_req: req = json_format.Parse(val, pb2.add_host_req(), ignore_unknown_fields=True) diff --git a/control/state.py b/control/state.py index 7bad93a9f..81f571b49 100644 --- a/control/state.py +++ b/control/state.py @@ -31,6 +31,7 @@ class GatewayState(ABC): HOST_PREFIX = "host" + OMAP_KEY_DELIMITER LISTENER_PREFIX = "listener" + OMAP_KEY_DELIMITER NAMESPACE_QOS_PREFIX = "qos" + OMAP_KEY_DELIMITER + NAMESPACE_LB_GROUP_PREFIX = "lbgroup" + OMAP_KEY_DELIMITER def is_key_element_valid(s: str) -> bool: if type(s) != str: @@ -51,6 +52,12 @@ def build_namespace_qos_key(subsystem_nqn: str, nsid) -> str: key += GatewayState.OMAP_KEY_DELIMITER + str(nsid) return key + def build_namespace_lb_group_key(subsystem_nqn: str, nsid) -> str: + key = GatewayState.NAMESPACE_LB_GROUP_PREFIX + subsystem_nqn + if nsid is not None: + key += GatewayState.OMAP_KEY_DELIMITER + str(nsid) + return key + def build_subsystem_key(subsystem_nqn: str) -> str: return GatewayState.SUBSYSTEM_PREFIX + subsystem_nqn @@ -111,6 +118,16 @@ def remove_namespace_qos(self, subsystem_nqn: str, nsid: str): key = GatewayState.build_namespace_qos_key(subsystem_nqn, nsid) self._remove_key(key) + def add_namespace_lb_group(self, subsystem_nqn: str, nsid: str, val: str): + """Adds namespace's load balancing group to the state data store.""" + key = GatewayState.build_namespace_lb_group_key(subsystem_nqn, nsid) + self._add_key(key, val) + + def remove_namespace_lb_group(self, subsystem_nqn: str, nsid: str): + """Removes namespace's load balancing group from the state data store.""" + key = GatewayState.build_namespace_lb_group_key(subsystem_nqn, nsid) + self._remove_key(key) + def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the state data store.""" key = GatewayState.build_subsystem_key(subsystem_nqn) @@ -600,6 +617,16 @@ def remove_namespace_qos(self, subsystem_nqn: str, nsid: str): self.omap.remove_namespace_qos(subsystem_nqn, nsid) self.local.remove_namespace_qos(subsystem_nqn, nsid) + def add_namespace_lb_group(self, subsystem_nqn: str, nsid: str, val: str): + """"Adds namespace's load balancing group to the state data store.""" + self.omap.add_namespace_lb_group(subsystem_nqn, nsid, val) + self.local.add_namespace_lb_group(subsystem_nqn, nsid, val) + + def remove_namespace_lb_group(self, subsystem_nqn: str, nsid: str): + """Removes namespace's load balancing group from the state data store.""" + self.omap.remove_namespace_lb_group(subsystem_nqn, nsid) + self.local.remove_namespace_lb_group(subsystem_nqn, nsid) + def add_subsystem(self, subsystem_nqn: str, val: str): """Adds a subsystem to the state data store.""" self.omap.add_subsystem(subsystem_nqn, val) @@ -685,8 +712,10 @@ def update(self) -> bool: with self.update_is_active_lock: prefix_list = [ GatewayState.SUBSYSTEM_PREFIX, - GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX, + GatewayState.NAMESPACE_PREFIX, + GatewayState.HOST_PREFIX, GatewayState.NAMESPACE_QOS_PREFIX, + GatewayState.NAMESPACE_LB_GROUP_PREFIX, GatewayState.LISTENER_PREFIX ] diff --git a/tests/test_cli.py b/tests/test_cli.py index e4579f94a..de59e6b19 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -822,7 +822,6 @@ def test_remove_namespace(self, caplog, gateway): caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "6"]) assert f"Deleting namespace 6 from {subsystem}: Successful" in caplog.text - assert f"Error removing namespace's QOS limits, they might not have been set" not in caplog.text bdev_found = False bdev_list = rpc_bdev.bdev_get_bdevs(gw.spdk_rpc_client) for b in bdev_list: @@ -836,7 +835,6 @@ def test_remove_namespace(self, caplog, gateway): caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "2"]) assert f"Deleting namespace 2 from {subsystem}: Successful" in caplog.text - assert f"Error removing namespace's QOS limits, they might not have been set" in caplog.text caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "4"]) assert f"Deleting namespace 4 from {subsystem}: Successful" in caplog.text