Skip to content

Commit

Permalink
Change namespace balancing group using a direct SPDK call.
Browse files Browse the repository at this point in the history
Fixes ceph#754

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jul 11, 2024
1 parent c759f4f commit e06bf7b
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 159 deletions.
6 changes: 3 additions & 3 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,9 +1671,9 @@ def ns_change_load_balancing_group(self, args):
"""Change namespace load balancing group."""

out_func, err_func = self.get_output_functions(args)
if args.nsid == None and args.uuid == None:
self.cli.parser.error("At least one of --nsid or --uuid arguments is mandatory for change_load_balancing_group command")
if args.nsid != None and args.nsid <= 0:
if args.nsid == None:
self.cli.parser.error("--nsid argument is mandatory for change_load_balancing_group command")
if args.nsid <= 0:
self.cli.parser.error("nsid value must be positive")
if args.load_balancing_group <= 0:
self.cli.parser.error("load-balancing-group value must be positive")
Expand Down
2 changes: 1 addition & 1 deletion control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ def store_async(self, conn, data, cmd_id):
self_conn.recv_async = True
self_conn.async_cmd_id = cmd_id

def _state_notify_update(self, update, is_add_req):
def _state_notify_update(self, update, all_requests, is_add_req):
"""Notify and reply async event."""

should_send_async_event = False
Expand Down
187 changes: 43 additions & 144 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,160 +1026,56 @@ def namespace_change_load_balancing_group_safe(self, request, context):

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.nsid:
errmsg = f"Failure changing load balancing group for namespace in {request.subsystem_nqn}: No NSID was given"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

ns_entry = None
state = self.gateway_state.local.get_state()
if request.nsid:
ns_key = GatewayState.build_namespace_key(request.subsystem_nqn, request.nsid)
try:
state_ns = state[ns_key]
ns_entry = json.loads(state_ns)
assert request.nsid == ns_entry["nsid"], f'Got a namespace with NSID {ns_entry["nsid"]} which is different than the requested one {request.nsid}'
assert request.subsystem_nqn == ns_entry["subsystem_nqn"], f'Got a namespace from subsystem {ns_entry["subsystem_nqn"]} which is different than the requested one {request.subsystem_nqn}'
except Exception as ex:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
elif request.uuid:
for key, val in state.items():
if not key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
continue
try:
ns = json.loads(val)
if ns["uuid"] == request.uuid:
ns_entry = ns
break
except Exception:
self.logger.exception(f"Got exception trying to get subsystem {nqn} namespaces")
pass
else:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. At least one of NSID or UUID should be specified"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

if not ns_entry:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace entry from local state"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

if not request.nsid:
try:
request.nsid = ns_entry["nsid"]
except Exception:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't get namespace NSID"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)

create_image = False
ns_size = None
force = False
try:
create_image = ns_entry["create_image"]
ns_size = ns_entry["size"]
force = ns_entry["force"]
except Exception:
self.logger.warning(f"Can't get all the attributes for namespace {nsid_msg}in {request.subsystem_nqn}.")

ns_qos_entry = None
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid)
try:
state_ns_qos = state[ns_qos_key]
ns_qos_entry = json.loads(state_ns_qos)
except Exception:
self.logger.debug(f"No QOS limits found for namespace {nsid_msg}in {request.subsystem_nqn}")
if ns_qos_entry:
try:
if ns_qos_entry["subsystem_nqn"] != request.subsystem_nqn:
errmsg = f'Got subsystem {ns_qos_entry["subsystem_nqn"]} for QOS limits instead of {request.subsystem_nqn}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
if ns_qos_entry["nsid"] != request.nsid:
errmsg = f'Got NSID {ns_qos_entry["nsid"]} for QOS limits instead of {request.nsid}, will not set QOS'
self.logger.error(errmsg)
ns_qos_entry = None
except Exception:
self.logger.error(f"Error processing QOS limits, will not set QOS")
ns_qos_entry = None

set_qos_req = None
if ns_qos_entry:
set_qos_req = pb2.namespace_set_qos_req()
set_qos_req.subsystem_nqn = request.subsystem_nqn
set_qos_req.nsid = request.nsid
if request.uuid:
set_qos_req.uuid = request.uuid
try:
set_qos_req.rw_ios_per_second = int(ns_qos_entry["rw_ios_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_ios_per_second")
try:
set_qos_req.rw_mbytes_per_second = int(ns_qos_entry["rw_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute rw_mbytes_per_second")
try:
set_qos_req.r_mbytes_per_second = int(ns_qos_entry["r_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute r_mbytes_per_second")
try:
set_qos_req.w_mbytes_per_second = int(ns_qos_entry["w_mbytes_per_second"])
except Exception:
self.logger.warning(f"Couldn't get QOS attribute w_mbytes_per_second")

namespace_add_req = pb2.namespace_add_req()
namespace_add_req.subsystem_nqn = request.subsystem_nqn
namespace_add_req.nsid = request.nsid
if request.uuid:
namespace_add_req.uuid = request.uuid
namespace_add_req.anagrpid = request.anagrpid
namespace_add_req.create_image = create_image
if ns_size:
namespace_add_req.size = int(ns_size)
namespace_add_req.force = force
errmsg = None
try:
namespace_add_req.rbd_pool_name=ns_entry["rbd_pool_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD pool name"
try:
namespace_add_req.rbd_image_name=ns_entry["rbd_image_name"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find RBD image name"
try:
namespace_add_req.block_size=ns_entry["block_size"]
except KeyError:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Can't find associated block device block size"

if errmsg:
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

del_req = pb2.namespace_delete_req(subsystem_nqn=request.subsystem_nqn, nsid=request.nsid, uuid=request.uuid)
ret_del = self.namespace_delete_safe(del_req, context)
if ret_del.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}. Can't delete namespace: {ret_del.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_del.status, error_message=errmsg)
ret = rpc_nvmf.nvmf_subsystem_set_ns_anagrpid(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid
)
self.logger.debug(f"nvmf_subsystem_set_ns_anagrpid: {ret}")
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"{change_lb_group_failure_prefix}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)

ret_ns = self.namespace_add_safe(namespace_add_req, context)
if ret_ns.status != 0:
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}:{ret_ns.error_message}"
self.logger.error(errmsg)
return pb2.req_status(status=ret_ns.status, error_message=errmsg)
# Just in case SPDK failed with no exception
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)

if set_qos_req:
ret_qos = self.namespace_set_qos_limits_safe(set_qos_req, context)
if context:
# Update gateway state
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_namespace_lb_group(request.subsystem_nqn, request.nsid, json_req)
except Exception as ex:
errmsg = f"Error persisting namespace load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

return pb2.req_status(status=0, error_message=os.strerror(0))

Expand All @@ -1197,8 +1093,11 @@ def remove_namespace_from_state(self, nqn, nsid, context):
# Update gateway state
try:
self.gateway_state.remove_namespace_qos(nqn, str(nsid))
except Exception:
self.logger.warning(f"Error removing namespace's QOS limits, they might not have been set")
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace_lb_group(nqn, str(nsid))
except Exception as ex:
pass
try:
self.gateway_state.remove_namespace(nqn, str(nsid))
Expand Down
37 changes: 36 additions & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,21 @@ def _ping(self):
self.logger.exception(f"spdk_get_version failed")
return False

def gateway_rpc_caller(self, requests, is_add_req):
def find_latest_lb_group(self, lb_requests, nqn, nsid):
###Look for an entry of a namespace change load balancing group and get the group id from there.###
key_to_find = GatewayState.build_namespace_lb_group_key(nqn, nsid)
if not lb_requests:
return (None, None)
for key, val in lb_requests.items():
if key == key_to_find:
try:
req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True)
return (key, req.anagrpid)
except Exception as ex:
self.logger.debug(f"Got exception while looking for change load balancing group entry for namespace {nsid} in {nqn}:\n{ex}")
return (None, None)

def gateway_rpc_caller(self, requests, all_requests, is_add_req):
"""Passes RPC requests to gateway service."""
for key, val in requests.items():
if key.startswith(GatewayState.SUBSYSTEM_PREFIX):
Expand All @@ -554,6 +568,20 @@ def gateway_rpc_caller(self, requests, is_add_req):
elif key.startswith(GatewayState.NAMESPACE_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.namespace_add_req(), ignore_unknown_fields=True)
lb_requests = None
try:
lb_requests = all_requests[GatewayState.NAMESPACE_LB_GROUP_PREFIX]
except Exception as ex:
pass
(key_to_ignore, latest_lb_group_id) = self.find_latest_lb_group(lb_requests, req.subsystem_nqn, req.nsid)
if latest_lb_group_id:
self.logger.debug(f"Will change load balancing group for namespace {req.nsid} in {req.subsystem_nqn} from {req.anagrpid} to {latest_lb_group_id}")
req.anagrpid = latest_lb_group_id
if key_to_ignore:
try:
lb_requests.pop(key_to_ignore)
except Exception as ex:
pass
self.gateway_rpc.namespace_add(req)
else:
req = json_format.Parse(val,
Expand All @@ -567,6 +595,13 @@ def gateway_rpc_caller(self, requests, is_add_req):
else:
# Do nothing, this is covered by the delete namespace code
pass
elif key.startswith(GatewayState.NAMESPACE_LB_GROUP_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.namespace_change_load_balancing_group_req(), ignore_unknown_fields=True)
self.gateway_rpc.namespace_change_load_balancing_group(req)
else:
# Do nothing, this is covered by the delete namespace code
pass
elif key.startswith(GatewayState.HOST_PREFIX):
if is_add_req:
req = json_format.Parse(val, pb2.add_host_req(), ignore_unknown_fields=True)
Expand Down
Loading

0 comments on commit e06bf7b

Please sign in to comment.