diff --git a/control/cli.py b/control/cli.py index 178d99e94..3bbe12d57 100644 --- a/control/cli.py +++ b/control/cli.py @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args): """Change namespace load balancing group.""" out_func, err_func = self.get_output_functions(args) - if args.nsid == None and args.uuid == None: - self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command") - if args.nsid != None and args.nsid <= 0: + if args.nsid == None: + self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command") + if args.nsid <= 0: self.cli.parser.error("nsid value must be positive") if args.load_balancing_group <= 0: self.cli.parser.error("load-balancing-group value must be positive") @@ -1850,11 +1850,14 @@ def namespace(self, args): @cli.cmd() def get_subsystems(self, args): """Get subsystems""" - subsystems = json_format.MessageToJson( - self.stub.get_subsystems(pb2.get_subsystems_req()), + subsystems = self.stub.get_subsystems(pb2.get_subsystems_req()) + if args.format == "python": + return subsystems + subsystems_out = json_format.MessageToJson( + subsystems, indent=4, including_default_value_fields=True, preserving_proto_field_name=True) - self.logger.info(f"Get subsystems:\n{subsystems}") + self.logger.info(f"Get subsystems:\n{subsystems_out}") def main_common(client, args): client.logger.setLevel(GatewayEnumUtils.get_value_from_key(pb2.GwLogLevel, args.log_level.lower())) diff --git a/control/grpc.py b/control/grpc.py index d631a19bd..65af00de7 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -918,7 +918,7 @@ def namespace_add_safe(self, request, context): anagrp = 0 peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}") + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid}, context: {context}{peer_msg}") if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -1026,160 +1026,82 @@ def namespace_change_load_balancing_group_safe(self, request, context): grps_list = [] peer_msg = self.get_peer_message(context) - nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") + change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}" + self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}") + + if not request.nsid: + errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given" + self.logger.error(errmsg) + return pb2.req_status(status=errno.ENODEV, error_message=errmsg) grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + if request.anagrpid not in grps_list: + self.logger.debug(f"ANA groups: {grps_list}") + errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist" + self.logger.error(errmsg) + return pb2.req_status(status=errno.ENODEV, error_message=errmsg) + omap_lock = self.omap_lock.get_omap_lock_to_use(context) with omap_lock: - if request.anagrpid not in grps_list: - self.logger.debug(f"ANA groups: {grps_list}") - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist" - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENODEV, error_message=errmsg) - ns_entry = None - state = self.gateway_state.local.get_state() - if request.nsid: + if context: + # notice that the local state might not be up to date in case we're in the middle of update() but as the + # context is not None, we are not in an update(), the omap lock made sure that we got here with an updated local state + state = self.gateway_state.local.get_state() ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid) try: state_ns = state[ns_key] ns_entry = json.loads(state_ns) - assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}' - assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}' except Exception as ex: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state" + errmsg = f"{change_lb_group_failure_prefix}: Can't find entry for namespace {request.nsid} in {request.subsystem_nqn}" self.logger.error(errmsg) - errmsg = f"{errmsg}:\n{ex}" return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - elif request.uuid: - for key, val in state.items(): - if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX): - continue - try: - ns = json.loads(val) - if ns["uuid"] == request.uuid: - ns_entry = ns - break - except Exception: - self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces") - pass - else: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified" - self.logger.error(errmsg) - return pb2.req_status(status=errno.EINVAL, error_message=errmsg) - if not ns_entry: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state" - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - - if not request.nsid: - try: - request.nsid = ns_entry["nsid"] - except Exception: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID" - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENOENT, error_message=errmsg) - - create_image = False - ns_size = None - force = False try: - create_image = ns_entry["create_image"] - ns_size = ns_entry["size"] - force = ns_entry["force"] - except Exception: - self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.") - - ns_qos_entry = None - ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid) - try: - state_ns_qos = state[ns_qos_key] - ns_qos_entry = json.loads(state_ns_qos) - except Exception: - self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}") - if ns_qos_entry: - try: - if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn: - errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS' - self.logger.error(errmsg) - ns_qos_entry = None - if ns_qos_entry["nsid"] != request.nsid: - errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS' - self.logger.error(errmsg) - ns_qos_entry = None - except Exception: - self.logger.error(f"Error processing QOS limits, will not set QOS") - ns_qos_entry = None - - set_qos_req = None - if ns_qos_entry: - set_qos_req = pb2.namespace_set_qos_req() - set_qos_req.subsystem_nqn = request.subsystem_nqn - set_qos_req.nsid = request.nsid - if request.uuid: - set_qos_req.uuid = request.uuid - try: - set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second") - try: - set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second") - try: - set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second") - try: - set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"]) - except Exception: - self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second") - - namespace_add_req = pb2.namespace_add_req() - namespace_add_req.subsystem_nqn = request.subsystem_nqn - namespace_add_req.nsid = request.nsid - if request.uuid: - namespace_add_req.uuid = request.uuid - namespace_add_req.anagrpid = request.anagrpid - namespace_add_req.create_image = create_image - if ns_size: - namespace_add_req.size = int(ns_size) - namespace_add_req.force = force - errmsg = None - try: - namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name" - try: - namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name" - try: - namespace_add_req.block_size=ns_entry["block_size"] - except KeyError: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size" - - if errmsg: - self.logger.error(errmsg) - return pb2.req_status(status=errno.ENODEV, error_message=errmsg) - - del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid) - ret_del = self.namespace_delete_safe(del_req, context) - if ret_del.status != 0: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}" - self.logger.error(errmsg) - return pb2.req_status(status=ret_del.status, error_message=errmsg) + ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + anagrpid=request.anagrpid + ) + self.subsystem_nsid_anagrp[request.subsystem_nqn][request.nsid] = request.anagrpid + self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}") + except Exception as ex: + errmsg = f"{change_lb_group_failure_prefix}:\n{ex}" + resp = self.parse_json_exeption(ex) + status = errno.EINVAL + if resp: + status = resp["code"] + errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}" + return pb2.req_status(status=status, error_message=errmsg) - ret_ns = self.namespace_add_safe(namespace_add_req, context) - if ret_ns.status != 0: - errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}" - self.logger.error(errmsg) - return pb2.req_status(status=ret_ns.status, error_message=errmsg) + # Just in case SPDK failed with no exception + if not ret: + self.logger.error(change_lb_group_failure_prefix) + return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix) - if set_qos_req: - ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context) + if context: + assert ns_entry, "Namespace entry is None for non-update call" + # Update gateway state + try: + add_req = pb2.namespace_add_req(rbd_pool_name=ns_entry["rbd_pool_name"], + rbd_image_name=ns_entry["rbd_image_name"], + subsystem_nqn=ns_entry["subsystem_nqn"], + nsid=ns_entry["nsid"], + block_size=ns_entry["block_size"], + uuid=ns_entry["uuid"], + anagrpid=request.anagrpid, + create_image=ns_entry["create_image"], + size=int(ns_entry["size"]), + force=ns_entry["force"]) + json_req = json_format.MessageToJson( + add_req, preserving_proto_field_name=True, including_default_value_fields=True) + self.gateway_state.add_namespace(request.subsystem_nqn, request.nsid, json_req) + except Exception as ex: + errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}" + self.logger.exception(errmsg) + errmsg = f"{errmsg}:\n{ex}" + return pb2.req_status(status=errno.EINVAL, error_message=errmsg) return pb2.req_status(status=0, error_message=os.strerror(0)) @@ -1197,8 +1119,11 @@ def remove_namespace_from_state(self, nqn, nsid, context): # Update gateway state try: self.gateway_state.remove_namespace_qos(nqn, str(nsid)) - except Exception: - self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set") + except Exception as ex: + pass + try: + self.gateway_state.remove_namespace_lb_group(nqn, str(nsid)) + except Exception as ex: pass try: self.gateway_state.remove_namespace(nqn, str(nsid)) diff --git a/control/server.py b/control/server.py index 6e246bf47..70e52bae4 100644 --- a/control/server.py +++ b/control/server.py @@ -581,3 +581,7 @@ def gateway_rpc_caller(self, requests, is_add_req): else: req = json_format.Parse(val, pb2.delete_listener_req(), ignore_unknown_fields=True) self.gateway_rpc.delete_listener(req) + elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX): + if is_add_req: + req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True) + self.gateway_rpc.namespace_change_load_balancing_group(req) diff --git a/control/state.py b/control/state.py index 7bad93a9f..e76fa4d64 100644 --- a/control/state.py +++ b/control/state.py @@ -16,7 +16,9 @@ from collections import defaultdict from abc import ABC, abstractmethod from .utils import GatewayLogger -import atexit +from .utils import GatewayUtils +from google.protobuf import json_format +from .proto import gateway_pb2 as pb2 class GatewayState(ABC): """Persists gateway NVMeoF target state. @@ -31,6 +33,7 @@ class GatewayState(ABC): HOST_PREFIX = "host" + OMAP_KEY_DELIMITER LISTENER_PREFIX = "listener" + OMAP_KEY_DELIMITER NAMESPACE_QOS_PREFIX = "qos" + OMAP_KEY_DELIMITER + NAMESPACE_LB_GROUP_PREFIX = "lbgroup" + OMAP_KEY_DELIMITER def is_key_element_valid(s: str) -> bool: if type(s) != str: @@ -45,6 +48,12 @@ def build_namespace_key(subsystem_nqn: str, nsid) -> str: key += GatewayState.OMAP_KEY_DELIMITER + str(nsid) return key + def build_namespace_lbgroup_key(subsystem_nqn: str, nsid) -> str: + key = GatewayState.NAMESPACE_LB_GROUP_PREFIX + subsystem_nqn + if nsid is not None: + key += GatewayState.OMAP_KEY_DELIMITER + str(nsid) + return key + def build_namespace_qos_key(subsystem_nqn: str, nsid) -> str: key = GatewayState.NAMESPACE_QOS_PREFIX + subsystem_nqn if nsid is not None: @@ -72,7 +81,7 @@ def build_listener_key_suffix(host: str, trtype: str, traddr: str, trsvcid: int) if trtype: return GatewayState.OMAP_KEY_DELIMITER + trtype + GatewayState.OMAP_KEY_DELIMITER + traddr + GatewayState.OMAP_KEY_DELIMITER + str(trsvcid) return GatewayState.OMAP_KEY_DELIMITER + traddr + GatewayState.OMAP_KEY_DELIMITER + str(trsvcid) - + def build_listener_key(subsystem_nqn: str, host: str, trtype: str, traddr: str, trsvcid: int) -> str: return GatewayState.build_partial_listener_key(subsystem_nqn, host) + GatewayState.build_listener_key_suffix(None, trtype, traddr, str(trsvcid)) @@ -665,10 +674,58 @@ def _update_caller(self, notify_event): notify_event.wait(max(update_time - time.time(), 0)) notify_event.clear() - def compare_state_values(self, val1, val2) -> bool: + def namespace_only_lb_group_id_changed(self, old_val, new_val): + old_req = None + new_req = None + try: + old_req = json_format.Parse(old_val, pb2.namespace_add_req(), ignore_unknown_fields=True) + except Exception as ex: + self.logger.exception(f"Got exception parsing {old_val}") + return (False, None) + try: + new_req = json_format.Parse(new_val, pb2.namespace_add_req(), ignore_unknown_fields=True) + except Exception as ex: + self.logger.exeption(f"Got exception parsing {new_val}") + return (False, None) + if not old_req or not new_req: + self.logger.debug(f"Failed to parse requests, old: {old_val} -> {old_req}, new: {new_val} -> {new_req}") + return (False, None) + assert old_req != new_req, f"Something was wrong we shouldn't get identical old and new values ({old_req})" + old_req.anagrpid = new_req.anagrpid + if old_req != new_req: + # Something besides the group id is different + return (False, None) + return (True, new_req.anagrpid) + + def break_namespace_key(self, ns_key: str): + if not ns_key.startswith(GatewayState.NAMESPACE_PREFIX): + self.logger.warning(f"Invalid namespace key \"{ns_key}\", can't find key parts") + return (None, None) + key_end = ns_key[len(GatewayState.NAMESPACE_PREFIX) : ] + key_parts = key_end.split(GatewayState.OMAP_KEY_DELIMITER) + if len(key_parts) != 2: + self.logger.warning(f"Invalid namespace key \"{ns_key}\", can't find key parts") + return (None, None) + if not GatewayUtils.is_valid_nqn(key_parts[0]): + self.logger.warning(f"Invalid NQN \"{key_parts[0]}\" found for namespace key \"{ns_key}\", can't find key parts") + return (None, None) + nqn = key_parts[0] + try: + nsid = int(key_parts[1]) + except Exception as ex: + self.logger.warning(f"Invalid NSID \"{key_parts[1]}\" found for namespace key \"{ns_key}\", can't find key parts") + return (None, None) + + return (nqn, nsid) + + def get_str_from_bytes(val): + val_str = val.decode() if type(val) == type(b'') else val + return val_str + + def compare_state_values(val1, val2) -> bool: # We sometimes get one value as type bytes and the other as type str, so convert them both to str for the comparison - val1_str = val1.decode() if type(val1) == type(b'') else val1 - val2_str = val2.decode() if type(val2) == type(b'') else val2 + val1_str = GatewayStateHandler.get_str_from_bytes(val1) + val2_str = GatewayStateHandler.get_str_from_bytes(val2) return val1_str == val2_str def update(self) -> bool: @@ -710,10 +767,49 @@ def update(self) -> bool: changed = { key: omap_state_dict[key] for key in same_keys - if not self.compare_state_values(local_state_dict[key], omap_state_dict[key]) + if not GatewayStateHandler.compare_state_values(local_state_dict[key], omap_state_dict[key]) } grouped_changed = self._group_by_prefix(changed, prefix_list) + # Handle namespace changes in which only the load balancing group id was changed + only_lb_group_changed = [] + for key in changed.keys(): + if not key.startswith(GatewayState.NAMESPACE_PREFIX): + continue + try: + (should_process, new_lb_grp_id) = self.namespace_only_lb_group_id_changed(local_state_dict[key], + omap_state_dict[key]) + if should_process: + assert new_lb_grp_id, "Shouldn't get here with en empty lb group id" + self.logger.debug(f"Found {key} where only the load balancing group id has changed. The new group id is {new_lb_grp_id}") + only_lb_group_changed.insert(0, (key, new_lb_grp_id)) + except Exception as ex: + self.logger.warning("Got exception checking namespace for load balancing group id change") + + for ns_key, new_lb_grp in only_lb_group_changed: + ns_nqn = None + ns_nsid = None + try: + changed.pop(ns_key) + (ns_nqn, ns_nsid) = self.break_namespace_key(ns_key) + except Exception as ex: + self.logger.error(f"Exception removing {ns_key} from {changed}:\n{ex}") + if ns_nqn and ns_nsid: + try: + lbgroup_key = GatewayState.build_namespace_lbgroup_key(ns_nqn, ns_nsid) + req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=ns_nqn, nsid=ns_nsid, + anagrpid=new_lb_grp) + json_req = json_format.MessageToJson(req, preserving_proto_field_name=True, + including_default_value_fields=True) + added[lbgroup_key] = json_req + except Exception as ex: + self.logger.error(f"Exception formatting change namespace load balancing group request:\n{ex}") + + if len(only_lb_group_changed) > 0: + grouped_changed = self._group_by_prefix(changed, prefix_list) + prefix_list += [GatewayState.NAMESPACE_LB_GROUP_PREFIX] + grouped_added = self._group_by_prefix(added, prefix_list) + # Find OMAP removals removed_keys = local_state_keys - omap_state_keys removed = {key: local_state_dict[key] for key in removed_keys} diff --git a/tests/test_cli.py b/tests/test_cli.py index e4579f94a..a4ce93ddd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -361,7 +361,7 @@ def test_add_namespace(self, caplog, gateway): assert f'"uuid": "{uuid}"' in caplog.text caplog.clear() cli(["namespace", "change_load_balancing_group", "--subsystem", subsystem, "--nsid", nsid, "--load-balancing-group", "10"]) - assert f"Failure changing load balancing group for namespace using NSID {nsid}" in caplog.text + assert f"Failure changing load balancing group for namespace with NSID {nsid} in {subsystem}" in caplog.text assert f"Load balancing group 10 doesn't exist" in caplog.text caplog.clear() cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GB", "--rbd-create-image"]) @@ -822,7 +822,6 @@ def test_remove_namespace(self, caplog, gateway): caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "6"]) assert f"Deleting namespace 6 from {subsystem}: Successful" in caplog.text - assert f"Error removing namespace's QOS limits, they might not have been set" not in caplog.text bdev_found = False bdev_list = rpc_bdev.bdev_get_bdevs(gw.spdk_rpc_client) for b in bdev_list: @@ -836,7 +835,6 @@ def test_remove_namespace(self, caplog, gateway): caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "2"]) assert f"Deleting namespace 2 from {subsystem}: Successful" in caplog.text - assert f"Error removing namespace's QOS limits, they might not have been set" in caplog.text caplog.clear() cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "4"]) assert f"Deleting namespace 4 from {subsystem}: Successful" in caplog.text diff --git a/tests/test_cli_change_lb.py b/tests/test_cli_change_lb.py index 59989e22e..37cc6427a 100644 --- a/tests/test_cli_change_lb.py +++ b/tests/test_cli_change_lb.py @@ -1,7 +1,9 @@ import pytest from control.server import GatewayServer from control.cli import main as cli +from control.cli import main_test as cli_test from control.cephutils import CephUtils +import spdk.rpc.nvmf as rpc_nvmf import grpc from control.proto import gateway_pb2 as pb2 from control.proto import gateway_pb2_grpc as pb2_grpc @@ -9,12 +11,14 @@ import time image = "mytestdevimage" -image2 = "mytestdevimage2" pool = "rbd" subsystem = "nqn.2016-06.io.spdk:cnode1" anagrpid = "1" anagrpid2 = "2" +uuid = "9dee1f89-e950-4a2f-b984-244ea73f1851" +uuid2 = "9dee1f89-e950-4a2f-b984-244ea73f1852" config = "ceph-nvmeof.conf" +namespace_count = 20 @pytest.fixture(scope="module") def two_gateways(config): @@ -37,10 +41,9 @@ def two_gateways(config): configB.config["gateway"]["name"] = nameB configB.config["gateway"]["override_hostname"] = nameB configB.config["spdk"]["rpc_socket_name"] = sockB - portB = portA + 2 + portB = portA + 1 discPortB = discPortA + 1 configB.config["gateway"]["port"] = str(portB) - discPort = configB.getint("discovery", "port") + 1 configB.config["discovery"]["port"] = str(discPortB) configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" @@ -56,54 +59,190 @@ def two_gateways(config): channelB = grpc.insecure_channel(f"{addr}:{portB}") stubB = pb2_grpc.GatewayStub(channelB) - yield gatewayA.gateway_rpc, stubA, gatewayB.gateway_rpc, stubB + yield gatewayA, stubA, gatewayB, stubB gatewayA.gateway_rpc.gateway_state.delete_state() gatewayB.gateway_rpc.gateway_state.delete_state() gatewayA.server.stop(grace=1) gatewayB.server.stop(grace=1) +def verify_one_namespace_lb_group(caplog, gw_port, subsys, nsid_to_verify, grp): + caplog.clear() + cli(["--server-port", gw_port, "--format", "json", "namespace", "list", "--subsystem", subsys, "--nsid", nsid_to_verify]) + assert f'"nsid": {nsid_to_verify},' in caplog.text + assert f'"load_balancing_group": {grp},' in caplog.text + +def verify_namespaces_using_get_subsystems(caplog, gw_port, subsys, first_nsid, last_nsid, grp): + caplog.clear() + subsys_info = cli_test(["--server-port", gw_port, "get_subsystems"]) + assert len(subsys_info.subsystems) == 1 + assert subsys_info.subsystems[0].nqn == subsys + assert len(subsys_info.subsystems[0].namespaces) >= last_nsid + for ns in range(first_nsid, last_nsid + 1): + assert subsys_info.subsystems[0].namespaces[ns - 1].nsid == ns + assert subsys_info.subsystems[0].namespaces[ns - 1].anagrpid == grp + +def verify_namespaces_using_spdk_get_subsystems(caplog, gw, subsys, first_nsid, last_nsid, grp): + caplog.clear() + subsys_info = rpc_nvmf.nvmf_get_subsystems(gw.gateway_rpc.spdk_rpc_client) + assert len(subsys_info) == 1 + assert subsys_info[0]["nqn"] == subsys + assert len(subsys_info[0]["namespaces"]) >= last_nsid + for ns in range(first_nsid, last_nsid + 1): + assert subsys_info[0]["namespaces"][ns - 1]["nsid"] == ns + assert subsys_info[0]["namespaces"][ns - 1]["anagrpid"] == grp + +def create_namespaces(caplog, ns_count, subsys): + for i in range(1, 1 + (ns_count // 2)): + caplog.clear() + cli(["--server-port", "5501", "namespace", "add", "--subsystem", subsys, "--rbd-pool", pool, "--rbd-image", f"{image}{i}", "--size", "16MB", "--rbd-create-image", "--load-balancing-group", anagrpid]) + assert f"Adding namespace {i} to {subsys}: Successful" in caplog.text + for i in range(1 + (ns_count // 2), 1 + ns_count): + caplog.clear() + cli(["--server-port", "5501", "namespace", "add", "--subsystem", subsys, "--rbd-pool", pool, "--rbd-image", f"{image}{i}", "--size", "16MB", "--rbd-create-image", "--load-balancing-group", anagrpid2]) + assert f"Adding namespace {i} to {subsys}: Successful" in caplog.text + time.sleep(10) + for i in range(1, 1 + (ns_count // 2)): + verify_one_namespace_lb_group(caplog, "5501", subsys, f"{i}", anagrpid) + verify_one_namespace_lb_group(caplog, "5502", subsys, f"{i}", anagrpid) + for i in range(1 + (ns_count // 2), 1 + ns_count): + verify_one_namespace_lb_group(caplog, "5501", subsys, f"{i}", anagrpid2) + verify_one_namespace_lb_group(caplog, "5502", subsys, f"{i}", anagrpid2) + +def change_one_namespace_lb_group(caplog, subsys, nsid_to_change, new_group): + caplog.clear() + cli(["--server-port", "5501", "namespace", "change_load_balancing_group", "--subsystem", subsys, "--nsid", nsid_to_change, "--load-balancing-group", new_group]) + time.sleep(10) + assert f"Changing load balancing group of namespace {nsid_to_change} in {subsys} to {new_group}: Successful" in caplog.text + assert f"Received request to change load balancing group for namespace with NSID {nsid_to_change} in {subsys} to {new_group}, context: