Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create namespace with automatic allocation of load balancing group #491

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"

CEPH_BRANCH=wip-baum-20240403-01
CEPH_SHA=12ef028e30def7f77ac937868a61e7891cb9e8f2


CEPH_VSTART_ARGS="--memstore"
CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
52 changes: 46 additions & 6 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import errno
import rbd
import rados
import time
from .utils import GatewayLogger

class CephUtils:
Expand All @@ -21,14 +22,54 @@ def __init__(self, config):
self.logger = GatewayLogger(config).logger
self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf")
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command(cmd, b'')
return rply

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
self.logger.info(f" Caching response of the monitor: {self.anagroup_list} ")
return self.anagroup_list
else :
try:
self.anagroup_list = []
self.last_sent = now
str = '{' + f'"prefix":"nvme-gw show", "pool":"{pool}", "group":"{group}"' + '}'
self.logger.info(f"nvme-show string: {str} ")
rply = self.execute_ceph_monitor_command(str)
self.logger.info(f"reply \"{rply}\"")
conv_str = rply[1].decode()
pos = conv_str.find("[")
if pos!= -1:
new_str = conv_str[pos+ len("[") :]
pos = new_str.find("]")
new_str = new_str[: pos].strip()
int_str_list = new_str.split(' ')
self.logger.info(f"new_str : {new_str}")
for x in int_str_list:
self.anagroup_list.append(int(x))
self.logger.info(self.anagroup_list)
else:
self.logger.info("Gws not found")

except Exception:
self.logger.exception(f"Failure get number created gateways:")
leonidc marked this conversation as resolved.
Show resolved Hide resolved
self.anagroup_list = []
pass
leonidc marked this conversation as resolved.
Show resolved Hide resolved

return self.anagroup_list

def fetch_and_display_ceph_version(self):
try:
with rados.Rados(conffile=self.ceph_conf, rados_id=self.rados_id) as cluster:
rply = cluster.mon_command('{"prefix":"mon versions"}', b'')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
rply = self.execute_ceph_monitor_command('{"prefix":"mon versions"}')
ceph_ver = rply[1].decode().removeprefix("{").strip().split(":")[0].removeprefix('"').removesuffix('"')
ceph_ver = ceph_ver.removeprefix("ceph version ")
self.logger.info(f"Connected to Ceph with version \"{ceph_ver}\"")
except Exception:
self.logger.exception(f"Failure fetching Ceph version:")
pass
Expand Down Expand Up @@ -90,4 +131,3 @@ def create_image(self, pool_name, image_name, size) -> bool:
raise rc_ex

return rc

9 changes: 4 additions & 5 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1258,10 +1258,9 @@ def ns_add(self, args):
args.block_size = 512
if args.block_size <= 0:
self.cli.parser.error("block-size value must be positive")
if args.load_balancing_group == None:
args.load_balancing_group = 1
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")

if args.load_balancing_group < 0:
self.cli.parser.error("load-balancing-group value must be positive")
if args.nsid != None and args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.rbd_create_image:
Expand Down Expand Up @@ -1795,7 +1794,7 @@ def ns_set_qos(self, args):
argument("--rbd-image", "-i", help="RBD image name", required=True),
argument("--rbd-create-image", "-c", help="Create RBD image if needed", action='store_true', required=False),
argument("--block-size", "-s", help="Block size", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int),
argument("--load-balancing-group", "-l", help="Load balancing group", type=int, default=0),
argument("--size", help="Size in bytes or specified unit (KB, KiB, MB, MiB, GB, GiB, TB, TiB)"),
argument("--force", help="Create a namespace even its image is already used by another namespace", action='store_true', required=False),
]
Expand Down
39 changes: 36 additions & 3 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.bdev_params = {}
self.subsystem_nsid_bdev = defaultdict(dict)
self.subsystem_nsid_anagrp = defaultdict(dict)
self.gateway_group = self.config.get("gateway", "group")
self.gateway_group = self.config.get_with_default("gateway", "group", "")
self.gateway_pool = self.config.get_with_default("ceph", "pool", "")
self._init_cluster_context()
self.subsys_ha = {}
self.subsys_max_ns = {}
Expand Down Expand Up @@ -871,12 +872,35 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
return pb2.req_status(status=ret_recycle , error_message=errmsg)
return pb2.req_status(status=True)

def choose_anagrpid_for_namespace(self, nsid) ->int:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
for ana_grp in grps_list:
if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added
self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ")
return ana_grp
#not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster
ana_load = {}
min_load = 2000
chosen_ana_group = 0
for ana_grp in self.clusters:
ana_load[ana_grp] = 0;
for name in self.clusters[ana_grp]:
ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all ana_grp clusters
for ana_grp in ana_load :
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ")
if ana_load[ana_grp] <= min_load:
min_load = ana_load[ana_grp]
chosen_ana_group = ana_grp
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ")
self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ")
return chosen_ana_group

def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand All @@ -897,7 +921,16 @@ def namespace_add_safe(self, request, context):
create_image = request.create_image
if not context:
create_image = False
anagrp = int(request.anagrpid) if request.anagrpid is not None else 0
else: # new namespace
if request.anagrpid == 0:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
# return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, peer_msg)
if ret_bdev.status != 0:
Expand Down
32 changes: 16 additions & 16 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,20 @@ def test_add_namespace_wrong_block_size(self, caplog, gateway):

def test_add_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", "junk", "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD pool junk doesn't exist" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--uuid", uuid, "--size", "16MiB", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert "Allocated cluster name='cluster_context_1_0'" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--size", "36M", "--rbd-create-image", "--load-balancing-group", "1", "--force"])
assert f"Image {pool}/{image2} already exists with a size of 16777216 bytes which differs from the requested size of 37748736 bytes" in caplog.text
assert f"Can't create RBD image {image}" in caplog.text
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16MiB", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -265,7 +265,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size=-16MiB", "--rbd-create-image", "--load-balancing-group", "1"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -274,7 +274,7 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "1x6MiB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
Expand All @@ -283,14 +283,14 @@ def test_add_namespace(self, caplog, gateway):
caplog.clear()
rc = 0
try:
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image2, "--block-size", "1024", "--size", "16mB", "--load-balancing-group", "1", "--rbd-create-image"])
except SystemExit as sysex:
rc = int(str(sysex))
pass
assert "must be numeric" in caplog.text
assert rc == 2
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--block-size", "1024", "--load-balancing-group", "1"])
assert f"Adding namespace 2 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "1"])
Expand All @@ -316,21 +316,21 @@ def test_add_namespace(self, caplog, gateway):
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", nsid])
assert f'"load_balancing_group": {anagrpid2}' in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image3, "--size", "4GiB", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 3 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"rbd_image_size": "4294967296"' in caplog.text

def test_add_namespace_ipv6(self, caplog, gateway):
caplog.clear()
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1","--force"])
assert f"Adding namespace 4 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "3"])
assert '"load_balancing_group": 1' in caplog.text
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--force"])
cli(["--server-address", server_addr_ipv6, "namespace", "add", "--subsystem", subsystem, "--nsid", "8", "--rbd-pool", pool, "--rbd-image", image, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 8 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["--format", "json", "namespace", "list", "--subsystem", subsystem, "--nsid", "8"])
Expand All @@ -339,18 +339,18 @@ def test_add_namespace_ipv6(self, caplog, gateway):
def test_add_namespace_same_image(self, caplog, gateway):
caplog.clear()
img_name = f"{image}_test"
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"Adding namespace 5 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--rbd-create-image"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--size", "16MiB", "--load-balancing-group", "1", "--rbd-create-image", "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1"])
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f"you can find the offending namespace by using" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", img_name, "--load-balancing-group", "1", "--force"])
assert f"Adding namespace 6 to {subsystem}, load balancing group 1: Successful" in caplog.text
assert f"RBD image {pool}/{img_name} is already used by a namespace" in caplog.text
assert f'will continue as the "force" argument was used' in caplog.text
Expand Down Expand Up @@ -434,7 +434,7 @@ def test_resize_namespace(self, caplog, gateway):
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", nsid, "--size", "128MiB"])
assert f"Failure resizing namespace using NSID {nsid} on {subsystem}: Can't find namespace" in caplog.text
caplog.clear()
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--nsid", nsid, "--rbd-pool", pool, "--rbd-image", image, "--uuid", uuid, "--force", "--load-balancing-group", "1"])
assert f"Adding namespace 1 to {subsystem}, load balancing group 1: Successful" in caplog.text
caplog.clear()
cli(["namespace", "resize", "--subsystem", subsystem, "--nsid", "3", "--size", "6GiB"])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
def create_resource_by_index(i):
subsystem = f"{subsystem_prefix}{i}"
cli(["subsystem", "add", "--subsystem", subsystem])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image", "--force"])
cli(["namespace", "add", "--subsystem", subsystem, "--rbd-pool", pool, "--rbd-image", image, "--size", "16MiB", "--rbd-create-image","--load-balancing-group", "1", "--force"])

def check_resource_by_index(i, caplog):
subsystem = f"{subsystem_prefix}{i}"
Expand Down
1 change: 1 addition & 0 deletions tests/test_namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def create_namespace(stub, rbd_pool, rbd_image, nsid):
rbd_image_name=rbd_image,
nsid=nsid,
block_size=4096,
anagrpid=1,
force=True)
ret_namespace = stub.namespace_add(namespace_req)
if ret_namespace.status != errno.ETIMEDOUT:
Expand Down
Loading