From 364cddc93406e2f3ce78cd74c2ba7d5fdbc02708 Mon Sep 17 00:00:00 2001 From: Leonid Chernin Date: Fri, 8 Mar 2024 06:41:22 +0000 Subject: [PATCH] create namespace with automatic allocation of load balancing group Signed-off-by: Leonid Chernin fix parsing of anagrp list fix when created NS only 1st gw chooses the lb group other take it from the request support create namespace with lb and without lb parameter - automatic allocation lb group fixing CR comments env changes in logic , fixing test_grpc_py --- .env | 3 +++ control/cephutils.py | 52 +++++++++++++++++++++++++++++++++++----- control/cli.py | 9 ++++--- control/grpc.py | 39 +++++++++++++++++++++++++++--- tests/test_cli.py | 32 ++++++++++++------------- tests/test_grpc.py | 2 +- tests/test_namespaces.py | 1 + 7 files changed, 107 insertions(+), 31 deletions(-) diff --git a/.env b/.env index f7594ec92..29fee4887 100644 --- a/.env +++ b/.env @@ -58,8 +58,11 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9" # Ceph Cluster CEPH_CLUSTER_VERSION="${CEPH_VERSION}" + CEPH_BRANCH=wip-baum-20240403-01 CEPH_SHA=12ef028e30def7f77ac937868a61e7891cb9e8f2 + + CEPH_VSTART_ARGS="--memstore" CEPH_DEVEL_MGR_PATH=../ceph diff --git a/control/cephutils.py b/control/cephutils.py index cd1c40caa..928ed2d94 100644 --- a/control/cephutils.py +++ b/control/cephutils.py @@ -11,6 +11,7 @@ import errno import rbd import rados +import time from .utils import GatewayLogger class CephUtils: @@ -21,14 +22,54 @@ def __init__(self, config): self.logger = GatewayLogger(config).logger self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf") self.rados_id = config.get_with_default("ceph", "id", "") + self.anagroup_list = [] + self.last_sent = time.time() + + def execute_ceph_monitor_command(self, cmd): + with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster: + rply = cluster.mon_command(cmd, b'') + return rply + + def get_number_created_gateways(self, pool, group): + now = time.time() + if (now - self.last_sent) < 10 and self.anagroup_list : + self.logger.info(f" Caching response of the monitor: {self.anagroup_list} ") + return self.anagroup_list + else : + try: + self.anagroup_list = [] + self.last_sent = now + str = '{' + f'"prefix":"nvme-gw show", "pool":"{pool}", "group":"{group}"' + '}' + self.logger.info(f"nvme-show string: {str} ") + rply = self.execute_ceph_monitor_command(str) + self.logger.info(f"reply \"{rply}\"") + conv_str = rply[1].decode() + pos = conv_str.find("[") + if pos!= -1: + new_str = conv_str[pos+ len("[") :] + pos = new_str.find("]") + new_str = new_str[: pos].strip() + int_str_list = new_str.split(' ') + self.logger.info(f"new_str : {new_str}") + for x in int_str_list: + self.anagroup_list.append(int(x)) + self.logger.info(self.anagroup_list) + else: + self.logger.info("Gws not found") + + except Exception: + self.logger.exception(f"Failure get number created gateways:") + self.anagroup_list = [] + pass + + return self.anagroup_list def fetch_and_display_ceph_version(self): try: - with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster: - rply = cluster.mon_command('{"prefix":"mon versions"}', b'') - ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"') - ceph_ver = ceph_ver.removeprefix("ceph version ") - self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"") + rply = self.execute_ceph_monitor_command('{"prefix":"mon versions"}') + ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"') + ceph_ver = ceph_ver.removeprefix("ceph version ") + self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"") except Exception: self.logger.exception(f"Failure fetching Ceph version:") pass @@ -90,4 +131,3 @@ def create_image(self, pool_name, image_name, size) -> bool: raise rc_ex return rc - diff --git a/control/cli.py b/control/cli.py index 1db719258..f669e418b 100644 --- a/control/cli.py +++ b/control/cli.py @@ -1258,10 +1258,9 @@ def ns_add(self, args): args.block_size = 512 if args.block_size <= 0: self.cli.parser.error("block-size value must be positive") - if args.load_balancing_group == None: - args.load_balancing_group = 1 - if args.load_balancing_group <= 0: - self.cli.parser.error("load-balancing-group value must be positive") + + if args.load_balancing_group < 0: + self.cli.parser.error("load-balancing-group value must be positive") if args.nsid != None and args.nsid <= 0: self.cli.parser.error("nsid value must be positive") if args.rbd_create_image: @@ -1795,7 +1794,7 @@ def ns_set_qos(self, args): argument("--rbd-image", "-i", help="RBD image name", required=True), argument("--rbd-create-image", "-c", help="Create RBD image if needed", action='store_true', required=False), argument("--block-size", "-s", help="Block size", type=int), - argument("--load-balancing-group", "-l", help="Load balancing group", type=int), + argument("--load-balancing-group", "-l", help="Load balancing group", type=int, default=0), argument("--size", help="Size in bytes or specified unit (KB, KiB, MB, MiB, GB, GiB, TB, TiB)"), argument("--force", help="Create a namespace even its image is already used by another namespace", action='store_true', required=False), ] diff --git a/control/grpc.py b/control/grpc.py index b10f69eb6..7c3325e64 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -169,7 +169,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp self.bdev_params = {} self.subsystem_nsid_bdev = defaultdict(dict) self.subsystem_nsid_anagrp = defaultdict(dict) - self.gateway_group = self.config.get("gateway", "group") + self.gateway_group = self.config.get_with_default("gateway", "group", "") + self.gateway_pool = self.config.get_with_default("ceph", "pool", "") self._init_cluster_context() self.subsys_ha = {} self.subsys_max_ns = {} @@ -871,12 +872,35 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None): return pb2.req_status(status=ret_recycle , error_message=errmsg) return pb2.req_status(status=True) + def choose_anagrpid_for_namespace(self, nsid) ->int: + grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group) + for ana_grp in grps_list: + if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added + self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ") + return ana_grp + #not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster + ana_load = {} + min_load = 2000 + chosen_ana_group = 0 + for ana_grp in self.clusters: + ana_load[ana_grp] = 0; + for name in self.clusters[ana_grp]: + ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all ana_grp clusters + for ana_grp in ana_load : + self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ") + if ana_load[ana_grp] <= min_load: + min_load = ana_load[ana_grp] + chosen_ana_group = ana_grp + self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ") + self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ") + return chosen_ana_group + def namespace_add_safe(self, request, context): """Adds a namespace to a subsystem.""" peer_msg = self.get_peer_message(context) nsid_msg = self.get_ns_id_message(request.nsid, request.uuid) - self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}") + self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}") if not request.uuid: request.uuid = str(uuid.uuid4()) @@ -897,7 +921,16 @@ def namespace_add_safe(self, request, context): create_image = request.create_image if not context: create_image = False - anagrp = int(request.anagrpid) if request.anagrpid is not None else 0 + else: # new namespace + if request.anagrpid == 0: + anagrp = self.choose_anagrpid_for_namespace(request.nsid) + # if anagrp == 0: + # errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}" + # self.logger.error(errmsg) + # return pb2.req_status(status=errno.EINVAL, error_message=errmsg) + request.anagrpid = anagrp + + anagrp = request.anagrpid ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name, request.rbd_image_name, request.block_size, create_image, request.size, peer_msg) if ret_bdev.status != 0: diff --git a/tests/test_cli.py b/tests/test_cli.py index daabd5243..9e50eb486 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -243,20 +243,20 @@ def test_add_namespace_wrong_block_size(self, caplog, gateway): def test_add_namespace(self, caplog, gateway): caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"RBD pool junk doesn't exist" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text assert "Allocated cluster name='cluster_context_1_0'" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--load-balancing-group", "1", "--force"]) assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text assert f"Can't create RBD image {image}" in caplog.text caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB", "--load-balancing-group", "1"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -265,7 +265,7 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image", "--load-balancing-group", "1"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -274,7 +274,7 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--load-balancing-group", "1", "--rbd-create-image"]) except SystemExit as sysex: rc = int(str(sysex)) pass @@ -283,14 +283,14 @@ def test_add_namespace(self, caplog, gateway): caplog.clear() rc = 0 try: - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--load-balancing-group", "1", "--rbd-create-image"]) except SystemExit as sysex: rc = int(str(sysex)) pass assert "must be numeric" in caplog.text assert rc == 2 caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024", "--load-balancing-group", "1"]) assert f"Adding namespace 2 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "1"]) @@ -316,7 +316,7 @@ def test_add_namespace(self, caplog, gateway): cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid]) assert f'"load_balancing_group": {anagrpid2}' in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"Adding namespace 3 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"]) @@ -324,13 +324,13 @@ def test_add_namespace(self, caplog, gateway): def test_add_namespace_ipv6(self, caplog, gateway): caplog.clear() - cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--force"]) + cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1","--force"]) assert f"Adding namespace 4 to {subsystem}, load balancing group 1: Successful" in caplog.text assert f'will continue as the "force" argument was used' in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"]) assert '"load_balancing_group": 1' in caplog.text - cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--force"]) + cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 8 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "8"]) @@ -339,18 +339,18 @@ def test_add_namespace_ipv6(self, caplog, gateway): def test_add_namespace_same_image(self, caplog, gateway): caplog.clear() img_name = f"{image}_test" - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"Adding namespace 5 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"]) assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f"you can find the offending namespace by using" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1"]) assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f"you can find the offending namespace by using" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1", "--force"]) assert f"Adding namespace 6 to {subsystem}, load balancing group 1: Successful" in caplog.text assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text assert f'will continue as the "force" argument was used' in caplog.text @@ -434,7 +434,7 @@ def test_resize_namespace(self, caplog, gateway): cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", nsid, "--size", "128MiB"]) assert f"Failure resizing namespace using NSID {nsid} on {subsystem}: Can't find namespace" in caplog.text caplog.clear() - cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force", "--load-balancing-group", "1"]) assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text caplog.clear() cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", "3", "--size", "6GiB"]) diff --git a/tests/test_grpc.py b/tests/test_grpc.py index 188a30ff8..1dba4d316 100644 --- a/tests/test_grpc.py +++ b/tests/test_grpc.py @@ -14,7 +14,7 @@ def create_resource_by_index(i): subsystem = f"{subsystem_prefix}{i}" cli(["subsystem", "add", "--subsystem", subsystem]) - cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image", "--force"]) + cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image","--load-balancing-group", "1", "--force"]) def check_resource_by_index(i, caplog): subsystem = f"{subsystem_prefix}{i}" diff --git a/tests/test_namespaces.py b/tests/test_namespaces.py index 065fea15c..6a7dcc58e 100644 --- a/tests/test_namespaces.py +++ b/tests/test_namespaces.py @@ -72,6 +72,7 @@ def create_namespace(stub, rbd_pool, rbd_image, nsid): rbd_image_name=rbd_image, nsid=nsid, block_size=4096, + anagrpid=1, force=True) ret_namespace = stub.namespace_add(namespace_req) if ret_namespace.status != errno.ETIMEDOUT: