Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change namespace balancing group using a direct SPDK call #762

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args):
"""Change namespace load balancing group."""

out_func, err_func = self.get_output_functions(args)
if args.nsid == None and args.uuid == None:
self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command")
if args.nsid != None and args.nsid <= 0:
if args.nsid == None:
self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command")
if args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")
Expand Down Expand Up @@ -1850,11 +1850,14 @@ def namespace(self, args):
@cli.cmd()
def get_subsystems(self, args):
"""Get subsystems"""
subsystems = json_format.MessageToJson(
self.stub.get_subsystems(pb2.get_subsystems_req()),
subsystems = self.stub.get_subsystems(pb2.get_subsystems_req())
if args.format == "python":
return subsystems
subsystems_out = json_format.MessageToJson(
subsystems,
indent=4, including_default_value_fields=True,
preserving_proto_field_name=True)
self.logger.info(f"Get subsystems:\n{subsystems}")
self.logger.info(f"Get subsystems:\n{subsystems_out}")

def main_common(client, args):
client.logger.setLevel(GatewayEnumUtils.get_value_from_key(pb2.GwLogLevel, args.log_level.lower()))
Expand Down
207 changes: 66 additions & 141 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ def namespace_add_safe(self, request, context):
anagrp = 0
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid}, context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())
Expand Down Expand Up @@ -1026,160 +1026,82 @@ def namespace_change_load_balancing_group_safe(self, request, context):

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.nsid:
errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

ns_entry = None
state = self.gateway_state.local.get_state()
if request.nsid:
if context:
# notice that the local state might not be up to date in case we're in the middle of update() but as the
# context is not None, we are not in an update(), the omap lock made sure that we got here with an updated local state
state = self.gateway_state.local.get_state()
ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid)
try:
state_ns = state[ns_key]
ns_entry = json.loads(state_ns)
assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}'
assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}'
except Exception as ex:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
errmsg = f"{change_lb_group_failure_prefix}: Can't find entry for namespace {request.nsid} in {request.subsystem_nqn}"
self.logger.error(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
elif request.uuid:
for key, val in state.items():
if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
continue
try:
ns = json.loads(val)
if ns["uuid"] == request.uuid:
ns_entry = ns
break
except Exception:
self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces")
pass
else:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

if not ns_entry:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

if not request.nsid:
try:
request.nsid = ns_entry["nsid"]
except Exception:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

create_image = False
ns_size = None
force = False
try:
create_image = ns_entry["create_image"]
ns_size = ns_entry["size"]
force = ns_entry["force"]
except Exception:
self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.")

ns_qos_entry = None
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid)
try:
state_ns_qos = state[ns_qos_key]
ns_qos_entry = json.loads(state_ns_qos)
except Exception:
self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}")
if ns_qos_entry:
try:
if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn:
errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
if ns_qos_entry["nsid"] != request.nsid:
errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
except Exception:
self.logger.error(f"Error processing QOS limits, will not set QOS")
ns_qos_entry = None

set_qos_req = None
if ns_qos_entry:
set_qos_req = pb2.namespace_set_qos_req()
set_qos_req.subsystem_nqn = request.subsystem_nqn
set_qos_req.nsid = request.nsid
if request.uuid:
set_qos_req.uuid = request.uuid
try:
set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second")
try:
set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second")
try:
set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second")
try:
set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second")

namespace_add_req = pb2.namespace_add_req()
namespace_add_req.subsystem_nqn = request.subsystem_nqn
namespace_add_req.nsid = request.nsid
if request.uuid:
namespace_add_req.uuid = request.uuid
namespace_add_req.anagrpid = request.anagrpid
namespace_add_req.create_image = create_image
if ns_size:
namespace_add_req.size = int(ns_size)
namespace_add_req.force = force
errmsg = None
try:
namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name"
try:
namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name"
try:
namespace_add_req.block_size=ns_entry["block_size"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size"

if errmsg:
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid)
ret_del = self.namespace_delete_safe(del_req, context)
if ret_del.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_del.status, error_message=errmsg)
ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid
)
self.subsystem_nsid_anagrp[request.subsystem_nqn][request.nsid] = request.anagrpid
self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}")
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)

ret_ns = self.namespace_add_safe(namespace_add_req, context)
if ret_ns.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_ns.status, error_message=errmsg)
# Just in case SPDK failed with no exception
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)

if set_qos_req:
ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context)
if context:
assert ns_entry, "Namespace entry is None for non-update call"
# Update gateway state
try:
add_req = pb2.namespace_add_req(rbd_pool_name=ns_entry["rbd_pool_name"],
rbd_image_name=ns_entry["rbd_image_name"],
subsystem_nqn=ns_entry["subsystem_nqn"],
nsid=ns_entry["nsid"],
block_size=ns_entry["block_size"],
uuid=ns_entry["uuid"],
anagrpid=request.anagrpid,
create_image=ns_entry["create_image"],
size=int(ns_entry["size"]),
force=ns_entry["force"])
json_req = json_format.MessageToJson(
add_req, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_namespace(request.subsystem_nqn, request.nsid, json_req)
except Exception as ex:
errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

return pb2.req_status(status=0, error_message=os.strerror(0))

Expand All @@ -1197,8 +1119,11 @@ def remove_namespace_from_state(self, nqn, nsid, context):
# Update gateway state
try:
self.gateway_state.remove_namespace_qos(nqn, str(nsid))
except Exception:
self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set")
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace_lb_group(nqn, str(nsid))
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace(nqn, str(nsid))
Expand Down
4 changes: 4 additions & 0 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,7 @@ def gateway_rpc_caller(self, requests, is_add_req):
else:
req = json_format.Parse(val, pb2.delete_listener_req(), ignore_unknown_fields=True)
self.gateway_rpc.delete_listener(req)
elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True)
self.gateway_rpc.namespace_change_load_balancing_group(req)
Loading
Loading