Skip to content

Commit

Permalink
Change namespace balancing group using a direct SPDK call
Browse files Browse the repository at this point in the history
Fixes ceph#754

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jul 17, 2024
1 parent 668c32c commit 91f9574
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 159 deletions.
6 changes: 3 additions & 3 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args):
"""Change namespace load balancing group."""

out_func, err_func = self.get_output_functions(args)
if args.nsid == None and args.uuid == None:
self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command")
if args.nsid != None and args.nsid <= 0:
if args.nsid == None:
self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command")
if args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")
Expand Down
206 changes: 65 additions & 141 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ def namespace_add_safe(self, request, context):
anagrp = 0
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid}, context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -1026,160 +1026,81 @@ def namespace_change_load_balancing_group_safe(self, request, context):

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.nsid:
errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

ns_entry = None
state = self.gateway_state.local.get_state()
if request.nsid:
if context:
# notice that the local state might not be up to date in case we're in the middle of update() but as the
# context is not None, we are not in an update(), the omap lock made sure that we got here with an updated local state
state = self.gateway_state.local.get_state()
ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid)
try:
state_ns = state[ns_key]
ns_entry = json.loads(state_ns)
assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}'
assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}'
except Exception as ex:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
errmsg = f"{change_lb_group_failure_prefix}: Can't find entry for namespace {request.nsid} in {request.subsystem_nqn}"
self.logger.error(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
elif request.uuid:
for key, val in state.items():
if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
continue
try:
ns = json.loads(val)
if ns["uuid"] == request.uuid:
ns_entry = ns
break
except Exception:
self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces")
pass
else:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

if not ns_entry:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

if not request.nsid:
try:
request.nsid = ns_entry["nsid"]
except Exception:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

create_image = False
ns_size = None
force = False
try:
create_image = ns_entry["create_image"]
ns_size = ns_entry["size"]
force = ns_entry["force"]
except Exception:
self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.")

ns_qos_entry = None
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid)
try:
state_ns_qos = state[ns_qos_key]
ns_qos_entry = json.loads(state_ns_qos)
except Exception:
self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}")
if ns_qos_entry:
try:
if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn:
errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
if ns_qos_entry["nsid"] != request.nsid:
errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
except Exception:
self.logger.error(f"Error processing QOS limits, will not set QOS")
ns_qos_entry = None

set_qos_req = None
if ns_qos_entry:
set_qos_req = pb2.namespace_set_qos_req()
set_qos_req.subsystem_nqn = request.subsystem_nqn
set_qos_req.nsid = request.nsid
if request.uuid:
set_qos_req.uuid = request.uuid
try:
set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second")
try:
set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second")
try:
set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second")
try:
set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second")

namespace_add_req = pb2.namespace_add_req()
namespace_add_req.subsystem_nqn = request.subsystem_nqn
namespace_add_req.nsid = request.nsid
if request.uuid:
namespace_add_req.uuid = request.uuid
namespace_add_req.anagrpid = request.anagrpid
namespace_add_req.create_image = create_image
if ns_size:
namespace_add_req.size = int(ns_size)
namespace_add_req.force = force
errmsg = None
try:
namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name"
try:
namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name"
try:
namespace_add_req.block_size=ns_entry["block_size"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size"

if errmsg:
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid)
ret_del = self.namespace_delete_safe(del_req, context)
if ret_del.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_del.status, error_message=errmsg)
ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid
)
self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}")
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)

ret_ns = self.namespace_add_safe(namespace_add_req, context)
if ret_ns.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_ns.status, error_message=errmsg)
# Just in case SPDK failed with no exception
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)

if set_qos_req:
ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context)
if context:
assert ns_entry, "Namespace entry is None for non-update call"
# Update gateway state
try:
add_req = pb2.namespace_add_req(rbd_pool_name=ns_entry["rbd_pool_name"],
rbd_image_name=ns_entry["rbd_image_name"],
subsystem_nqn=ns_entry["subsystem_nqn"],
nsid=ns_entry["nsid"],
block_size=ns_entry["block_size"],
uuid=ns_entry["uuid"],
anagrpid=request.anagrpid,
create_image=ns_entry["create_image"],
size=int(ns_entry["size"]),
force=ns_entry["force"])
json_req = json_format.MessageToJson(
add_req, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_namespace(request.subsystem_nqn, request.nsid, json_req)
except Exception as ex:
errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

return pb2.req_status(status=0, error_message=os.strerror(0))

Expand All @@ -1197,8 +1118,11 @@ def remove_namespace_from_state(self, nqn, nsid, context):
# Update gateway state
try:
self.gateway_state.remove_namespace_qos(nqn, str(nsid))
except Exception:
self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set")
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace_lb_group(nqn, str(nsid))
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace(nqn, str(nsid))
Expand Down
4 changes: 4 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,7 @@ def gateway_rpc_caller(self, requests, is_add_req):
else:
req = json_format.Parse(val, pb2.delete_listener_req(), ignore_unknown_fields=True)
self.gateway_rpc.delete_listener(req)
elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True)
self.gateway_rpc.namespace_change_load_balancing_group(req)
Loading

0 comments on commit 91f9574

Please sign in to comment.