From b4028c32d4659b3791a49228795daa2be40b19fe Mon Sep 17 00:00:00 2001 From: Leonid Chernin Date: Fri, 8 Mar 2024 06:41:22 +0000 Subject: [PATCH] create namespace with automatic allocation of load balancing group Signed-off-by: Leonid Chernin fix parsing of anagrp list fix when created NS only 1st gw chooses the lb group other take it from the request support create namespace with lb and without lb parameter - automatic allocation lb group fixing CR comments env changes in logic , fixing test_grpc_py --- .env | 3 +++ control/cephutils.py | 52 +++++++++++++++++++++++++++++++++++----- control/cli.py | 9 ++++--- control/grpc.py | 39 +++++++++++++++++++++++++++--- tests/test_cli.py | 32 ++++++++++++------------- tests/test_grpc.py | 2 +- tests/test_namespaces.py | 1 + 7 files changed, 107 insertions(+), 31 deletions(-) diff --git a/.env b/.env index f7594ec9..29fee488 100644 --- a/.env +++ b/.env @@ -58,8 +58,11 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9" # Ceph Cluster CEPH_CLUSTER_VERSION="${CEPH_VERSION}" + CEPH_BRANCH=wip-baum-20240403-01 CEPH_SHA=12ef028e30def7f77ac937868a61e7891cb9e8f2 + + CEPH_VSTART_ARGS="--memstore" CEPH_DEVEL_MGR_PATH=../ceph diff --git a/control/cephutils.py b/control/cephutils.py index cd1c40ca..928ed2d9 100644 --- a/control/cephutils.py +++ b/control/cephutils.py @@ -11,6 +11,7 @@ import errno import rbd import rados +import time from .utils import GatewayLogger class CephUtils: @@ -21,14 +22,54 @@ def __init__(self, config): self.logger = GatewayLogger(config).logger self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf") self.rados_id = config.get_with_default("ceph", "id", "") + self.anagroup_list = [] + self.last_sent = time.time() + + def execute_ceph_monitor_command(self, cmd): + with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster: + rply = cluster.mon_command(cmd, b'') + return rply + + def get_number_created_gateways(self, pool, group): + now = time.time() + if (now - self.last_sent) < 10 and self.anagroup_list : + self.logger.info(f" Caching response of the monitor: {self.anagroup_list} ") + return self.anagroup_list + else : + try: + self.anagroup_list = [] + self.last_sent = now + str = '{' + f'"prefix":"nvme-gw show", "pool":"{pool}", "group":"{group}"' + '}' + self.logger.info(f"nvme-show string: {str} ") + rply = self.execute_ceph_monitor_command(str) + self.logger.info(f"reply \"{rply}\"") + conv_str = rply[1].decode() + pos = conv_str.find("[") + if pos!= -1: + new_str = conv_str[pos+ len("[") :] + pos = new_str.find("]") + new_str = new_str[: pos].strip() + int_str_list = new_str.split(' ') + self.logger.info(f"new_str : {new_str}") + for x in int_str_list: + self.anagroup_list.append(int(x)) + self.logger.info(self.anagroup_list) + else: + self.logger.info("Gws not found") + + except Exception: + self.logger.exception(f"Failure get number created gateways:") + self.anagroup_list = [] + pass + + return self.anagroup_list def fetch_and_display_ceph_version(self): try: - with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster: - rply = cluster.mon_command('{"prefix":"mon versions"}', b'') - ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"') - ceph_ver = ceph_ver.removeprefix("ceph version ") - self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"") + rply = self.execute_ceph_monitor_command('{"prefix":"mon versions"}') + ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"') + ceph_ver = ceph_ver.removeprefix("ceph version ") + self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"") except Exception: self.logger.exception(f"Failure fetching Ceph version:") pass @@ -90,4 +131,3 @@ def create_image(self, pool_name, image_name, size) -> bool: raise rc_ex return rc - diff --git a/control/cli.py b/control/cli.py index 1db71925..f669e418 100644 --- a/control/cli.py +++ b/control/cli.py @@ -1258,10 +1258,9 @@ def ns_add(self, args): args.block_size = 512 if args.block_size <= 0: self.cli.parser.error("block-size value must be positive") - if args.load_balancing_group == None: - args.load_balancing_group = 1 - if args.load_balancing_group <= 0: - self.cli.parser.error("load-balancing-group value must be positive") + + if args.load_balancing_group < 0: + self.cli.parser.error("load-balancing-group value must be positive") if args.nsid != None and args.nsid <= 0: self.cli.parser.error("nsid value must be positive") if args.rbd_create_image: @@ -1795,7 +1794,7 @@ def ns_set_qos(self, args): argument("--rbd-image", "-i", help="RBD image name", required=True), argument("--rbd-create-image", "-c", help="Create RBD image if needed", action='store_true', required=False), argument("--block-size", "-s", help="Block size", type=int), - argument("--load-balancing-group", "-l", help="Load balancing group", type=int), + argument("--load-balancing-group", "-l", help="Load balancing group", type=int, default=0), argument("--size", help="Size in bytes or specified unit (KB, KiB, MB, MiB, GB, GiB, TB, TiB)"), argument("--force", help="Create a namespace even its image is already used by another namespace", action='store_true', required=False), ] diff --git a/control/grpc.py b/control/grpc.py index b10f69eb..a7c37a26 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -169,7 +169,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp self.bdev_params = {} self.subsystem_nsid_bdev = defaultdict(dict) self.subsystem_nsid_anagrp = defaultdict(dict) - self.gateway_group = self.config.get("gateway", "group") + self.gateway_group = self.config.get_with_default("gateway", "group", "") + self.gateway_pool = self.config.get_with_default("ceph", "pool", "") self._init_cluster_context() self.subsys_ha = {} self.subsys_max_ns = {} @@ -871,12 +872,35 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): return pb2.req_status(status=ret_recycle , error_message=errmsg) return pb2.req_status(status=True) + def choose_anagrpid_for_namespace(self, nsid) ->int: + grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + for ana_grp in grps_list: + if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added + self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ") + return ana_grp + #not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster + ana_load = {} + min_load = 2000 + chosen_ana_group = 0 + for ana_grp in self.clusters: + ana_load[ana_grp] = 0; + for name in self.clusters[ana_grp]: + ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all ana_grp clusters + for ana_grp in ana_load : + self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ") + if ana_load[ana_grp] <= min_load: + min_load = ana_load[ana_grp] + chosen_ana_group = ana_grp + self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ") + self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ") + return chosen_ana_group + def namespace_add_safe(self, request, context): """Adds a namespace to a subsystem.""" peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}") + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}") if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -897,7 +921,16 @@ def namespace_add_safe(self, request, context): create_image = request.create_image if not context: create_image = False - anagrp = int(request.anagrpid) if request.anagrpid is not None else 0 + else: # new namespace + if request.anagrpid == 0: + anagrp = self.choose_anagrpid_for_namespace(request.nsid) + # if anagrp == 0: + # errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}" + # self.logger.error(errmsg) + # return pb2.req_status(status=errno.EINVAL, error_message=errmsg) + request.anagrpid = anagrp + + anagrp = request.anagrpid ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, request.rbd_image_name, request.block_size, create_image, request.size, peer_msg) if ret_bdev.status != 0: diff --git a/tests/test_cli.py b/tests/test_cli.py index daabd524..9e50eb48 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -243,20 +243,20 @@ def test_add_namespace_wrong_block_size(self, caplog, gateway): def test_add_namespace(self, caplog, gateway): caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"RBD pool junk doesn't exist" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text assert "Allocated cluster name='cluster_context_1_0'" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--load-balancing-group", "1", "--force"]) assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text assert f"Can't create RBD image {image}" in caplog.text caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB", "--load-balancing-group", "1"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -265,7 +265,7 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image", "--load-balancing-group", "1"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -274,7 +274,7 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--load-balancing-group", "1", "--rbd-create-image"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -283,14 +283,14 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--load-balancing-group", "1", "--rbd-create-image"]) except SystemExit as sysex: rc = int(str(sysex)) pass assert "must be numeric" in caplog.text assert rc == 2 caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024", "--load-balancing-group", "1"]) assert f"Adding namespace 2 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "1"]) @@ -316,7 +316,7 @@ def test_add_namespace(self, caplog, gateway): cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid]) assert f'"load_balancing_group": {anagrpid2}' in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"Adding namespace 3 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"]) @@ -324,13 +324,13 @@ def test_add_namespace(self, caplog, gateway): def test_add_namespace_ipv6(self, caplog, gateway): caplog.clear() - cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--force"]) + cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1","--force"]) assert f"Adding namespace 4 to {subsystem}, load balancing group 1: Successful" in caplog.text assert f'will continue as the "force" argument was used' in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"]) assert '"load_balancing_group": 1' in caplog.text - cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--force"]) + cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 8 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "8"]) @@ -339,18 +339,18 @@ def test_add_namespace_ipv6(self, caplog, gateway): def test_add_namespace_same_image(self, caplog, gateway): caplog.clear() img_name = f"{image}_test" - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"Adding namespace 5 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f"you can find the offending namespace by using" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1"]) assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f"you can find the offending namespace by using" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 6 to {subsystem}, load balancing group 1: Successful" in caplog.text assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f'will continue as the "force" argument was used' in caplog.text @@ -434,7 +434,7 @@ def test_resize_namespace(self, caplog, gateway): cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", nsid, "--size", "128MiB"]) assert f"Failure resizing namespace using NSID {nsid} on {subsystem}: Can't find namespace" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force", "--load-balancing-group", "1"]) assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", "3", "--size", "6GiB"]) diff --git a/tests/test_grpc.py b/tests/test_grpc.py index 188a30ff..1dba4d31 100644 --- a/tests/test_grpc.py +++ b/tests/test_grpc.py @@ -14,7 +14,7 @@ def create_resource_by_index(i): subsystem = f"{subsystem_prefix}{i}" cli(["subsystem", "add", "--subsystem", subsystem]) - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image","--load-balancing-group", "1", "--force"]) def check_resource_by_index(i, caplog): subsystem = f"{subsystem_prefix}{i}" diff --git a/tests/test_namespaces.py b/tests/test_namespaces.py index 065fea15..6a7dcc58 100644 --- a/tests/test_namespaces.py +++ b/tests/test_namespaces.py @@ -72,6 +72,7 @@ def create_namespace(stub, rbd_pool, rbd_image, nsid): rbd_image_name=rbd_image, nsid=nsid, block_size=4096, + anagrpid=1, force=True) ret_namespace = stub.namespace_add(namespace_req) if ret_namespace.status != errno.ETIMEDOUT: