Skip to content

Commit

Permalink
Merge pull request #491 from leonidc/leo_devel
Browse files Browse the repository at this point in the history
create namespace with automatic allocation of load balancing group
  • Loading branch information
leonidc authored Apr 10, 2024
2 parents 466573f + b4028c3 commit 8287623
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"

CEPH_BRANCH=wip-baum-20240403-01
CEPH_SHA=12ef028e30def7f77ac937868a61e7891cb9e8f2


CEPH_VSTART_ARGS="--memstore"
CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
52 changes: 46 additions & 6 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import rbd
import rados
import time
from .utils import GatewayLogger

class CephUtils:
Expand All @@ -21,14 +22,54 @@ def __init__(self, config):
self.logger = GatewayLogger(config).logger
self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf")
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command(cmd, b'')
return rply

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
self.logger.info(f" Caching response of the monitor: {self.anagroup_list} ")
return self.anagroup_list
else :
try:
self.anagroup_list = []
self.last_sent = now
str = '{' + f'"prefix":"nvme-gw show", "pool":"{pool}", "group":"{group}"' + '}'
self.logger.info(f"nvme-show string: {str} ")
rply = self.execute_ceph_monitor_command(str)
self.logger.info(f"reply \"{rply}\"")
conv_str = rply[1].decode()
pos = conv_str.find("[")
if pos!= -1:
new_str = conv_str[pos+ len("[") :]
pos = new_str.find("]")
new_str = new_str[: pos].strip()
int_str_list = new_str.split(' ')
self.logger.info(f"new_str : {new_str}")
for x in int_str_list:
self.anagroup_list.append(int(x))
self.logger.info(self.anagroup_list)
else:
self.logger.info("Gws not found")

except Exception:
self.logger.exception(f"Failure get number created gateways:")
self.anagroup_list = []
pass

return self.anagroup_list

def fetch_and_display_ceph_version(self):
try:
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command('{"prefix":"mon versions"}', b'')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
rply = self.execute_ceph_monitor_command('{"prefix":"mon versions"}')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
except Exception:
self.logger.exception(f"Failure fetching Ceph version:")
pass
Expand Down Expand Up @@ -90,4 +131,3 @@ def create_image(self, pool_name, image_name, size) -> bool:
raise rc_ex

return rc

9 changes: 4 additions & 5 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,10 +1258,9 @@ def ns_add(self, args):
args.block_size = 512
if args.block_size <= 0:
self.cli.parser.error("block-size value must be positive")
if args.load_balancing_group == None:
args.load_balancing_group = 1
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")

if args.load_balancing_group < 0:
self.cli.parser.error("load-balancing-group value must be positive")
if args.nsid != None and args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.rbd_create_image:
Expand Down Expand Up @@ -1795,7 +1794,7 @@ def ns_set_qos(self, args):
argument("--rbd-image", "-i", help="RBD image name", required=True),
argument("--rbd-create-image", "-c", help="Create RBD image if needed", action='store_true', required=False),
argument("--block-size", "-s", help="Block size", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int, default=0),
argument("--size", help="Size in bytes or specified unit (KB, KiB, MB, MiB, GB, GiB, TB, TiB)"),
argument("--force", help="Create a namespace even its image is already used by another namespace", action='store_true', required=False),
]
Expand Down
39 changes: 36 additions & 3 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.bdev_params = {}
self.subsystem_nsid_bdev = defaultdict(dict)
self.subsystem_nsid_anagrp = defaultdict(dict)
self.gateway_group = self.config.get("gateway", "group")
self.gateway_group = self.config.get_with_default("gateway", "group", "")
self.gateway_pool = self.config.get_with_default("ceph", "pool", "")
self._init_cluster_context()
self.subsys_ha = {}
self.subsys_max_ns = {}
Expand Down Expand Up @@ -871,12 +872,35 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
return pb2.req_status(status=ret_recycle , error_message=errmsg)
return pb2.req_status(status=True)

def choose_anagrpid_for_namespace(self, nsid) ->int:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
for ana_grp in grps_list:
if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added
self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ")
return ana_grp
#not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster
ana_load = {}
min_load = 2000
chosen_ana_group = 0
for ana_grp in self.clusters:
ana_load[ana_grp] = 0;
for name in self.clusters[ana_grp]:
ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all ana_grp clusters
for ana_grp in ana_load :
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ")
if ana_load[ana_grp] <= min_load:
min_load = ana_load[ana_grp]
chosen_ana_group = ana_grp
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ")
self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ")
return chosen_ana_group

def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand All @@ -897,7 +921,16 @@ def namespace_add_safe(self, request, context):
create_image = request.create_image
if not context:
create_image = False
anagrp = int(request.anagrpid) if request.anagrpid is not None else 0
else: # new namespace
if request.anagrpid == 0:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
# return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, peer_msg)
if ret_bdev.status != 0:
Expand Down
32 changes: 16 additions & 16 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,20 @@ def test_add_namespace_wrong_block_size(self, caplog, gateway):

def test_add_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD pool junk doesn't exist" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_1_0'" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text
assert f"Can't create RBD image {image}" in caplog.text
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -265,7 +265,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -274,7 +274,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -283,14 +283,14 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
assert "must be numeric" in caplog.text
assert rc == 2
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024", "--load-balancing-group", "1"])
assert f"Adding namespace 2 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "1"])
Expand All @@ -316,21 +316,21 @@ def test_add_namespace(self, caplog, gateway):
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid])
assert f'"load_balancing_group": {anagrpid2}' in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 3 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"rbd_image_size": "4294967296"' in caplog.text

def test_add_namespace_ipv6(self, caplog, gateway):
caplog.clear()
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1","--force"])
assert f"Adding namespace 4 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"load_balancing_group": 1' in caplog.text
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 8 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "8"])
Expand All @@ -339,18 +339,18 @@ def test_add_namespace_ipv6(self, caplog, gateway):
def test_add_namespace_same_image(self, caplog, gateway):
caplog.clear()
img_name = f"{image}_test"
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 5 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 6 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
Expand Down Expand Up @@ -434,7 +434,7 @@ def test_resize_namespace(self, caplog, gateway):
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", nsid, "--size", "128MiB"])
assert f"Failure resizing namespace using NSID {nsid} on {subsystem}: Can't find namespace" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force", "--load-balancing-group", "1"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", "3", "--size", "6GiB"])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
def create_resource_by_index(i):
subsystem = f"{subsystem_prefix}{i}"
cli(["subsystem", "add", "--subsystem", subsystem])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image","--load-balancing-group", "1", "--force"])

def check_resource_by_index(i, caplog):
subsystem = f"{subsystem_prefix}{i}"
Expand Down
1 change: 1 addition & 0 deletions tests/test_namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def create_namespace(stub, rbd_pool, rbd_image, nsid):
rbd_image_name=rbd_image,
nsid=nsid,
block_size=4096,
anagrpid=1,
force=True)
ret_namespace = stub.namespace_add(namespace_req)
if ret_namespace.status != errno.ETIMEDOUT:
Expand Down

0 comments on commit 8287623

Please sign in to comment.