Skip to content

Commit

Permalink
Change namespace balancing group using a direct SPDK call.
Browse files Browse the repository at this point in the history
Fixes #754

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jul 10, 2024
1 parent c759f4f commit 1830497
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 150 deletions.
6 changes: 3 additions & 3 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args):
"""Change namespace load balancing group."""

out_func, err_func = self.get_output_functions(args)
if args.nsid == None and args.uuid == None:
self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command")
if args.nsid != None and args.nsid <= 0:
if args.nsid == None:
self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command")
if args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")
Expand Down
187 changes: 43 additions & 144 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,160 +1026,56 @@ def namespace_change_load_balancing_group_safe(self, request, context):

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.nsid:
errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

ns_entry = None
state = self.gateway_state.local.get_state()
if request.nsid:
ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid)
try:
state_ns = state[ns_key]
ns_entry = json.loads(state_ns)
assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}'
assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}'
except Exception as ex:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
elif request.uuid:
for key, val in state.items():
if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
continue
try:
ns = json.loads(val)
if ns["uuid"] == request.uuid:
ns_entry = ns
break
except Exception:
self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces")
pass
else:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

if not ns_entry:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

if not request.nsid:
try:
request.nsid = ns_entry["nsid"]
except Exception:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

create_image = False
ns_size = None
force = False
try:
create_image = ns_entry["create_image"]
ns_size = ns_entry["size"]
force = ns_entry["force"]
except Exception:
self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.")

ns_qos_entry = None
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid)
try:
state_ns_qos = state[ns_qos_key]
ns_qos_entry = json.loads(state_ns_qos)
except Exception:
self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}")
if ns_qos_entry:
try:
if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn:
errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
if ns_qos_entry["nsid"] != request.nsid:
errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
except Exception:
self.logger.error(f"Error processing QOS limits, will not set QOS")
ns_qos_entry = None

set_qos_req = None
if ns_qos_entry:
set_qos_req = pb2.namespace_set_qos_req()
set_qos_req.subsystem_nqn = request.subsystem_nqn
set_qos_req.nsid = request.nsid
if request.uuid:
set_qos_req.uuid = request.uuid
try:
set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second")
try:
set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second")
try:
set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second")
try:
set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second")

namespace_add_req = pb2.namespace_add_req()
namespace_add_req.subsystem_nqn = request.subsystem_nqn
namespace_add_req.nsid = request.nsid
if request.uuid:
namespace_add_req.uuid = request.uuid
namespace_add_req.anagrpid = request.anagrpid
namespace_add_req.create_image = create_image
if ns_size:
namespace_add_req.size = int(ns_size)
namespace_add_req.force = force
errmsg = None
try:
namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name"
try:
namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name"
try:
namespace_add_req.block_size=ns_entry["block_size"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size"

if errmsg:
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid)
ret_del = self.namespace_delete_safe(del_req, context)
if ret_del.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_del.status, error_message=errmsg)
ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid
)
self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}")
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)

ret_ns = self.namespace_add_safe(namespace_add_req, context)
if ret_ns.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_ns.status, error_message=errmsg)
# Just in case SPDK failed with no exception
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)

if set_qos_req:
ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context)
if context:
# Update gateway state
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_namespace_lb_group(request.subsystem_nqn, request.nsid, json_req)
except Exception as ex:
errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

return pb2.req_status(status=0, error_message=os.strerror(0))

Expand All @@ -1197,8 +1093,11 @@ def remove_namespace_from_state(self, nqn, nsid, context):
# Update gateway state
try:
self.gateway_state.remove_namespace_qos(nqn, str(nsid))
except Exception:
self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set")
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace_lb_group(nqn, str(nsid))
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace(nqn, str(nsid))
Expand Down
7 changes: 7 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,13 @@ def gateway_rpc_caller(self, requests, is_add_req):
else:
# Do nothing, this is covered by the delete namespace code
pass
elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True)
self.gateway_rpc.namespace_change_load_balancing_group(req)
else:
# Do nothing, this is covered by the delete namespace code
pass
elif key.startswith(GatewayState.HOST_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.add_host_req(), ignore_unknown_fields=True)
Expand Down
31 changes: 30 additions & 1 deletion control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class GatewayState(ABC):
HOST_PREFIX = "host" + OMAP_KEY_DELIMITER
LISTENER_PREFIX = "listener" + OMAP_KEY_DELIMITER
NAMESPACE_QOS_PREFIX = "qos" + OMAP_KEY_DELIMITER
NAMESPACE_LB_GROUP_PREFIX = "lbgroup" + OMAP_KEY_DELIMITER

def is_key_element_valid(s: str) -> bool:
if type(s) != str:
Expand All @@ -51,6 +52,12 @@ def build_namespace_qos_key(subsystem_nqn: str, nsid) -> str:
key += GatewayState.OMAP_KEY_DELIMITER + str(nsid)
return key

def build_namespace_lb_group_key(subsystem_nqn: str, nsid) -> str:
key = GatewayState.NAMESPACE_LB_GROUP_PREFIX + subsystem_nqn
if nsid is not None:
key += GatewayState.OMAP_KEY_DELIMITER + str(nsid)
return key

def build_subsystem_key(subsystem_nqn: str) -> str:
return GatewayState.SUBSYSTEM_PREFIX + subsystem_nqn

Expand Down Expand Up @@ -111,6 +118,16 @@ def remove_namespace_qos(self, subsystem_nqn: str, nsid: str):
key = GatewayState.build_namespace_qos_key(subsystem_nqn, nsid)
self._remove_key(key)

def add_namespace_lb_group(self, subsystem_nqn: str, nsid: str, val: str):
"""Adds namespace's load balancing group to the state data store."""
key = GatewayState.build_namespace_lb_group_key(subsystem_nqn, nsid)
self._add_key(key, val)

def remove_namespace_lb_group(self, subsystem_nqn: str, nsid: str):
"""Removes namespace's load balancing group from the state data store."""
key = GatewayState.build_namespace_lb_group_key(subsystem_nqn, nsid)
self._remove_key(key)

def add_subsystem(self, subsystem_nqn: str, val: str):
"""Adds a subsystem to the state data store."""
key = GatewayState.build_subsystem_key(subsystem_nqn)
Expand Down Expand Up @@ -600,6 +617,16 @@ def remove_namespace_qos(self, subsystem_nqn: str, nsid: str):
self.omap.remove_namespace_qos(subsystem_nqn, nsid)
self.local.remove_namespace_qos(subsystem_nqn, nsid)

def add_namespace_lb_group(self, subsystem_nqn: str, nsid: str, val: str):
""""Adds namespace's load balancing group to the state data store."""
self.omap.add_namespace_lb_group(subsystem_nqn, nsid, val)
self.local.add_namespace_lb_group(subsystem_nqn, nsid, val)

def remove_namespace_lb_group(self, subsystem_nqn: str, nsid: str):
"""Removes namespace's load balancing group from the state data store."""
self.omap.remove_namespace_lb_group(subsystem_nqn, nsid)
self.local.remove_namespace_lb_group(subsystem_nqn, nsid)

def add_subsystem(self, subsystem_nqn: str, val: str):
"""Adds a subsystem to the state data store."""
self.omap.add_subsystem(subsystem_nqn, val)
Expand Down Expand Up @@ -685,8 +712,10 @@ def update(self) -> bool:
with self.update_is_active_lock:
prefix_list = [
GatewayState.SUBSYSTEM_PREFIX,
GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX,
GatewayState.NAMESPACE_PREFIX,
GatewayState.HOST_PREFIX,
GatewayState.NAMESPACE_QOS_PREFIX,
GatewayState.NAMESPACE_LB_GROUP_PREFIX,
GatewayState.LISTENER_PREFIX
]

Expand Down
2 changes: 0 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,6 @@ def test_remove_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "6"])
assert f"Deleting namespace 6 from {subsystem}: Successful" in caplog.text
assert f"Error removing namespace's QOS limits, they might not have been set" not in caplog.text
bdev_found = False
bdev_list = rpc_bdev.bdev_get_bdevs(gw.spdk_rpc_client)
for b in bdev_list:
Expand All @@ -836,7 +835,6 @@ def test_remove_namespace(self, caplog, gateway):
caplog.clear()
cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "2"])
assert f"Deleting namespace 2 from {subsystem}: Successful" in caplog.text
assert f"Error removing namespace's QOS limits, they might not have been set" in caplog.text
caplog.clear()
cli(["namespace", "del", "--subsystem", subsystem, "--nsid", "4"])
assert f"Deleting namespace 4 from {subsystem}: Successful" in caplog.text
Expand Down

0 comments on commit 1830497

Please sign in to comment.