Skip to content

Commit

Permalink
create namespace with automatic allocation of load balancing group
Browse files Browse the repository at this point in the history
Signed-off-by: Leonid Chernin <[email protected]>

fix parsing of anagrp list
fix when created NS only 1st gw chooses the lb group other take it from the request
support create namespace with lb and without lb parameter - automatic allocation lb group
fixing CR comments
env
changes in logic , fixing test_grpc_py
  • Loading branch information
Leonid Chernin committed Apr 9, 2024
1 parent 466573f commit b4028c3
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"

CEPH_BRANCH=wip-baum-20240403-01
CEPH_SHA=12ef028e30def7f77ac937868a61e7891cb9e8f2


CEPH_VSTART_ARGS="--memstore"
CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
52 changes: 46 additions & 6 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import rbd
import rados
import time
from .utils import GatewayLogger

class CephUtils:
Expand All @@ -21,14 +22,54 @@ def __init__(self, config):
self.logger = GatewayLogger(config).logger
self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf")
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command(cmd, b'')
return rply

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
self.logger.info(f" Caching response of the monitor: {self.anagroup_list} ")
return self.anagroup_list
else :
try:
self.anagroup_list = []
self.last_sent = now
str = '{' + f'"prefix":"nvme-gw show", "pool":"{pool}", "group":"{group}"' + '}'
self.logger.info(f"nvme-show string: {str} ")
rply = self.execute_ceph_monitor_command(str)
self.logger.info(f"reply \"{rply}\"")
conv_str = rply[1].decode()
pos = conv_str.find("[")
if pos!= -1:
new_str = conv_str[pos+ len("[") :]
pos = new_str.find("]")
new_str = new_str[: pos].strip()
int_str_list = new_str.split(' ')
self.logger.info(f"new_str : {new_str}")
for x in int_str_list:
self.anagroup_list.append(int(x))
self.logger.info(self.anagroup_list)
else:
self.logger.info("Gws not found")

except Exception:
self.logger.exception(f"Failure get number created gateways:")
self.anagroup_list = []
pass

return self.anagroup_list

def fetch_and_display_ceph_version(self):
try:
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command('{"prefix":"mon versions"}', b'')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
rply = self.execute_ceph_monitor_command('{"prefix":"mon versions"}')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
except Exception:
self.logger.exception(f"Failure fetching Ceph version:")
pass
Expand Down Expand Up @@ -90,4 +131,3 @@ def create_image(self, pool_name, image_name, size) -> bool:
raise rc_ex

return rc

9 changes: 4 additions & 5 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,10 +1258,9 @@ def ns_add(self, args):
args.block_size = 512
if args.block_size <= 0:
self.cli.parser.error("block-size value must be positive")
if args.load_balancing_group == None:
args.load_balancing_group = 1
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")

if args.load_balancing_group < 0:
self.cli.parser.error("load-balancing-group value must be positive")
if args.nsid != None and args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.rbd_create_image:
Expand Down Expand Up @@ -1795,7 +1794,7 @@ def ns_set_qos(self, args):
argument("--rbd-image", "-i", help="RBD image name", required=True),
argument("--rbd-create-image", "-c", help="Create RBD image if needed", action='store_true', required=False),
argument("--block-size", "-s", help="Block size", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int, default=0),
argument("--size", help="Size in bytes or specified unit (KB, KiB, MB, MiB, GB, GiB, TB, TiB)"),
argument("--force", help="Create a namespace even its image is already used by another namespace", action='store_true', required=False),
]
Expand Down
39 changes: 36 additions & 3 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.bdev_params = {}
self.subsystem_nsid_bdev = defaultdict(dict)
self.subsystem_nsid_anagrp = defaultdict(dict)
self.gateway_group = self.config.get("gateway", "group")
self.gateway_group = self.config.get_with_default("gateway", "group", "")
self.gateway_pool = self.config.get_with_default("ceph", "pool", "")
self._init_cluster_context()
self.subsys_ha = {}
self.subsys_max_ns = {}
Expand Down Expand Up @@ -871,12 +872,35 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
return pb2.req_status(status=ret_recycle , error_message=errmsg)
return pb2.req_status(status=True)

def choose_anagrpid_for_namespace(self, nsid) ->int:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
for ana_grp in grps_list:
if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added
self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ")
return ana_grp
#not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster
ana_load = {}
min_load = 2000
chosen_ana_group = 0
for ana_grp in self.clusters:
ana_load[ana_grp] = 0;
for name in self.clusters[ana_grp]:
ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all ana_grp clusters
for ana_grp in ana_load :
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ")
if ana_load[ana_grp] <= min_load:
min_load = ana_load[ana_grp]
chosen_ana_group = ana_grp
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ")
self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ")
return chosen_ana_group

def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand All @@ -897,7 +921,16 @@ def namespace_add_safe(self, request, context):
create_image = request.create_image
if not context:
create_image = False
anagrp = int(request.anagrpid) if request.anagrpid is not None else 0
else: # new namespace
if request.anagrpid == 0:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
# return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, peer_msg)
if ret_bdev.status != 0:
Expand Down
32 changes: 16 additions & 16 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,20 @@ def test_add_namespace_wrong_block_size(self, caplog, gateway):

def test_add_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD pool junk doesn't exist" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_1_0'" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text
assert f"Can't create RBD image {image}" in caplog.text
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -265,7 +265,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -274,7 +274,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -283,14 +283,14 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
assert "must be numeric" in caplog.text
assert rc == 2
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024", "--load-balancing-group", "1"])
assert f"Adding namespace 2 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "1"])
Expand All @@ -316,21 +316,21 @@ def test_add_namespace(self, caplog, gateway):
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid])
assert f'"load_balancing_group": {anagrpid2}' in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 3 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"rbd_image_size": "4294967296"' in caplog.text

def test_add_namespace_ipv6(self, caplog, gateway):
caplog.clear()
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1","--force"])
assert f"Adding namespace 4 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"load_balancing_group": 1' in caplog.text
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 8 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "8"])
Expand All @@ -339,18 +339,18 @@ def test_add_namespace_ipv6(self, caplog, gateway):
def test_add_namespace_same_image(self, caplog, gateway):
caplog.clear()
img_name = f"{image}_test"
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 5 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 6 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
Expand Down Expand Up @@ -434,7 +434,7 @@ def test_resize_namespace(self, caplog, gateway):
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", nsid, "--size", "128MiB"])
assert f"Failure resizing namespace using NSID {nsid} on {subsystem}: Can't find namespace" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force", "--load-balancing-group", "1"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", "3", "--size", "6GiB"])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
def create_resource_by_index(i):
subsystem = f"{subsystem_prefix}{i}"
cli(["subsystem", "add", "--subsystem", subsystem])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image","--load-balancing-group", "1", "--force"])

def check_resource_by_index(i, caplog):
subsystem = f"{subsystem_prefix}{i}"
Expand Down
1 change: 1 addition & 0 deletions tests/test_namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def create_namespace(stub, rbd_pool, rbd_image, nsid):
rbd_image_name=rbd_image,
nsid=nsid,
block_size=4096,
anagrpid=1,
force=True)
ret_namespace = stub.namespace_add(namespace_req)
if ret_namespace.status != errno.ETIMEDOUT:
Expand Down

0 comments on commit b4028c3

Please sign in to comment.