Skip to content

Commit

Permalink
Do not call SPDK for namespace details in namespace commands.
Browse files Browse the repository at this point in the history
Fixes ceph#449

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Jul 31, 2024
1 parent 887c784 commit f4e6f72
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 137 deletions.
227 changes: 91 additions & 136 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.cluster_nonce = {}
self.bdev_cluster = {}
self.bdev_params = {}
self.subsystem_nsid_bdev = defaultdict(dict)
self.subsystem_nsid_bdev_and_uuid = defaultdict(dict)
self.subsystem_listeners = defaultdict(set)
self._init_cluster_context()
self.subsys_max_ns = {}
Expand Down Expand Up @@ -354,9 +354,10 @@ def resize_bdev(self, bdev_name, new_size, peer_msg = ""):
"""Resizes a bdev."""

self.logger.info(f"Received request to resize bdev {bdev_name} to {new_size} MiB{peer_msg}")
assert self.rpc_lock.locked(), "RPC is unlocked when calling resize_bdev()"
rbd_pool_name = None
rbd_image_name = None
bdev_info = self.get_bdev_info(bdev_name, True)
bdev_info = self.get_bdev_info(bdev_name)
if bdev_info is not None:
try:
drv_specific_info = bdev_info["driver_specific"]
Expand All @@ -379,38 +380,35 @@ def resize_bdev(self, bdev_name, new_size, peer_msg = ""):
self.logger.warning(f"Error trying to get the size of image {rbd_pool_name}/{rbd_image_name}, won't check size for shrinkage:\n{ex}")
pass

with self.rpc_lock:
try:
ret = rpc_bdev.bdev_rbd_resize(
self.spdk_rpc_client,
name=bdev_name,
new_size=new_size,
)
self.logger.debug(f"resize_bdev {bdev_name}: {ret}")
except Exception as ex:
errmsg = f"Failure resizing bdev {bdev_name}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"Failure resizing bdev {bdev_name}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)
try:
ret = rpc_bdev.bdev_rbd_resize(
self.spdk_rpc_client,
name=bdev_name,
new_size=new_size,
)
self.logger.debug(f"resize_bdev {bdev_name}: {ret}")
except Exception as ex:
errmsg = f"Failure resizing bdev {bdev_name}"
self.logger.exception(errmsg)
errmsg = f"{errmsg}:\n{ex}"
resp = self.parse_json_exeption(ex)
status = errno.EINVAL
if resp:
status = resp["code"]
errmsg = f"Failure resizing bdev {bdev_name}: {resp['message']}"
return pb2.req_status(status=status, error_message=errmsg)

if not ret:
errmsg = f"Failure resizing bdev {bdev_name}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
if not ret:
errmsg = f"Failure resizing bdev {bdev_name}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

return pb2.req_status(status=0, error_message=os.strerror(0))
return pb2.req_status(status=0, error_message=os.strerror(0))

def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""):
"""Deletes a bdev."""

if not self.rpc_lock.locked():
self.logger.error(f"A call to delete_bdev() without holding the RPC lock")
assert self.rpc_lock.locked(), "RPC is unlocked when calling delete_bdev()"
assert self.rpc_lock.locked(), "RPC is unlocked when calling delete_bdev()"

self.logger.info(f"Received request to delete bdev {bdev_name}{peer_msg}")
try:
Expand Down Expand Up @@ -666,7 +664,8 @@ def delete_subsystem_safe(self, request, context):
)
self.subsys_max_ns.pop(request.subsystem_nqn)
if request.subsystem_nqn in self.subsystem_listeners:
self.subsystem_listeners.pop(request.subsystem_nqn)
self.subsystem_listeners.pop(request.subsystem_nqn, None)
self.remove_namespace_bdev_and_uuid_from_list(request.subsystem_nqn)
self.logger.debug(f"delete_subsystem {request.subsystem_nqn}: {ret}")
except Exception as ex:
self.logger.exception(delete_subsystem_error_prefix)
Expand Down Expand Up @@ -788,7 +787,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
anagrpid=anagrpid,
uuid=uuid,
)
self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name
self.add_namespace_bdev_and_uuid_to_list(subsystem_nqn, nsid, bdev_name, uuid)
self.logger.debug(f"subsystem_add_ns: {nsid}")
except Exception as ex:
self.logger.exception(add_namespace_error_prefix)
Expand Down Expand Up @@ -966,7 +965,7 @@ def namespace_add_safe(self, request, context):
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
self.logger.error(errmsg)
# Delete the bdev just to be on the safe side
ns_bdev = self.get_bdev_info(bdev_name, False)
ns_bdev = self.get_bdev_info(bdev_name)
if ns_bdev != None:
try:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
Expand Down Expand Up @@ -1175,23 +1174,18 @@ def remove_namespace(self, subsystem_nqn, nsid, context):

return pb2.req_status(status=0, error_message=os.strerror(0))

def get_bdev_info(self, bdev_name, need_to_lock):
def get_bdev_info(self, bdev_name):
"""Get bdev info"""

assert self.rpc_lock.locked(), "RPC is unlocked when calling get_bdev_info()"
ret_bdev = None
if need_to_lock:
lock_to_use = self.rpc_lock
else:
lock_to_use = contextlib.suppress()

with lock_to_use:
try:
bdevs = rpc_bdev.bdev_get_bdevs(self.spdk_rpc_client, name=bdev_name)
if (len(bdevs) > 1):
self.logger.warning(f"Got {len(bdevs)} bdevs for bdev name {bdev_name}, will use the first one")
ret_bdev = bdevs[0]
except Exception:
self.logger.exception(f"Got exception while getting bdev {bdev_name} info")
try:
bdevs = rpc_bdev.bdev_get_bdevs(self.spdk_rpc_client, name=bdev_name)
if (len(bdevs) > 1):
self.logger.warning(f"Got {len(bdevs)} bdevs for bdev name {bdev_name}, will use the first one")
ret_bdev = bdevs[0]
except Exception:
self.logger.exception(f"Got exception while getting bdev {bdev_name} info")

return ret_bdev

Expand Down Expand Up @@ -1250,7 +1244,6 @@ def list_namespaces(self, request, context=None):
self.logger.debug(f'Filter out namespace with UUID {n["uuid"]} which is different than requested UUID {request.uuid}')
continue
bdev_name = n["bdev_name"]
ns_bdev = self.get_bdev_info(bdev_name, True)
lb_group = 0
try:
lb_group = n["anagrpid"]
Expand All @@ -1260,6 +1253,8 @@ def list_namespaces(self, request, context=None):
bdev_name = bdev_name,
uuid = n["uuid"],
load_balancing_group = lb_group)
with self.rpc_lock:
ns_bdev = self.get_bdev_info(bdev_name)
if ns_bdev == None:
self.logger.warning(f"Can't find namespace's bdev {bdev_name}, will not list bdev's information")
else:
Expand Down Expand Up @@ -1305,13 +1300,13 @@ def namespace_get_io_stats(self, request, context=None):
return pb2.namespace_io_stats_info(status=errno.EINVAL, error_message=errmsg)

with self.rpc_lock:
find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, False, "Failure getting namespace's IO stats")
ns = find_ret[0]
if not ns:
find_ret = self.find_namespace_bdev_and_uuid(request.subsystem_nqn, request.nsid)
uuid = find_ret["uuid"]
if not uuid:
errmsg = f"Failure getting IO stats for namespace {request.nsid} on {request.subsystem_nqn}: Can't find namespace"
self.logger.error(errmsg)
return pb2.namespace_io_stats_info(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret[1]
bdev_name = find_ret["bdev"]
if not bdev_name:
errmsg = f"Failure getting IO stats for namespace {request.nsid} on {request.subsystem_nqn}: Can't find associated block device"
self.logger.error(errmsg)
Expand Down Expand Up @@ -1360,8 +1355,8 @@ def namespace_get_io_stats(self, request, context=None):
io_stats = pb2.namespace_io_stats_info(status=0,
error_message=os.strerror(0),
subsystem_nqn=request.subsystem_nqn,
nsid=ns["nsid"],
uuid=ns["uuid"],
nsid=request.nsid,
uuid=uuid,
bdev_name=bdev_name,
tick_rate=ret["tick_rate"],
ticks=ret["ticks"],
Expand Down Expand Up @@ -1423,17 +1418,16 @@ def namespace_set_qos_limits_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.namespace_io_stats_info(status=errno.EINVAL, error_message=errmsg)

find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, False, "Failure setting namespace's QOS limits")
if not find_ret[0]:
find_ret = self.find_namespace_bdev_and_uuid(request.subsystem_nqn, request.nsid)
if not find_ret["uuid"]:
errmsg = f"Failure setting QOS limits for namespace {request.nsid} on {request.subsystem_nqn}: Can't find namespace"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret[1]
bdev_name = find_ret["bdev"]
if not bdev_name:
errmsg = f"Failure setting QOS limits for namespace {request.nsid} on {request.subsystem_nqn}: Can't find associated block device"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
nsid = find_ret[0]["nsid"]

set_qos_limits_args = {}
set_qos_limits_args["name"] = bdev_name
Expand All @@ -1449,7 +1443,7 @@ def namespace_set_qos_limits_safe(self, request, context):
ns_qos_entry = None
if context:
state = self.gateway_state.local.get_state()
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, nsid)
ns_qos_key = GatewayState.build_namespace_qos_key(request.subsystem_nqn, request.nsid)
try:
state_ns_qos = state[ns_qos_key]
ns_qos_entry = json.loads(state_ns_qos)
Expand Down Expand Up @@ -1499,7 +1493,7 @@ def namespace_set_qos_limits_safe(self, request, context):
try:
json_req = json_format.MessageToJson(
request, preserving_proto_field_name=True, including_default_value_fields=True)
self.gateway_state.add_namespace_qos(request.subsystem_nqn, nsid, json_req)
self.gateway_state.add_namespace_qos(request.subsystem_nqn, request.nsid, json_req)
except Exception as ex:
errmsg = f"Error persisting namespace QOS settings {request.nsid} on {request.subsystem_nqn}"
self.logger.exception(errmsg)
Expand All @@ -1512,72 +1506,27 @@ def namespace_set_qos_limits(self, request, context=None):
"""Set namespace's qos limits."""
return self.execute_grpc_function(self.namespace_set_qos_limits_safe, request, context)

def find_namespace_and_bdev_name(self, nqn, nsid, needs_lock, err_prefix):
if not nsid:
self.logger.error(f"{err_prefix}: NSID should be specified for finding a namesapce")
return (None, None)

if nsid <= 0:
self.logger.error(f"{err_prefix}: NSID should be positive")
return (None, None)

if needs_lock:
lock_to_use = self.rpc_lock
else:
if not self.rpc_lock.locked():
self.logger.error(f"A call to find_namespace_and_bdev_name() with 'needs_lock' set to False and without holding the RPC lock")
assert self.rpc_lock.locked(), "RPC is unlocked when calling find_namespace_and_bdev_name()"
lock_to_use = contextlib.suppress()

with lock_to_use:
try:
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client, nqn=nqn)
self.logger.debug(f"find_namespace_and_bdev_name: {ret}")
except Exception as ex:
self.logger.exception(err_prefix)
errmsg = f"{err_prefix}:\n{ex}"
return (None, None)

if not ret:
return (None, None)
def remove_namespace_bdev_and_uuid_from_list(self, nqn, nsid=None):
if nqn in self.subsystem_nsid_bdev_and_uuid:
if nsid:
if nsid in self.subsystem_nsid_bdev_and_uuid[nqn]:
self.subsystem_nsid_bdev_and_uuid[nqn].pop(nsid, None)
else:
self.subsystem_nsid_bdev_and_uuid.pop(nqn, None)

bdev_name = None
found_ns = None
for s in ret:
try:
if s["nqn"] != nqn:
self.logger.warning(f'Got subsystem {s["nqn"]} instead of {nqn}, ignore')
continue
try:
ns_list = s["namespaces"]
except Exception:
ns_list = []
pass
for n in ns_list:
if nsid != n["nsid"]:
continue
found_ns = n
break
break
except Exception:
self.logger.exception(f"{s=} parse error")
pass
def add_namespace_bdev_and_uuid_to_list(self, nqn, nsid, bdev, uuid):
self.subsystem_nsid_bdev_and_uuid[nqn][nsid] = dict(uuid = uuid, bdev = bdev)

uuid = None
if found_ns:
try:
uuid = found_ns["uuid"]
except Exception:
self.logger.exception(f"{found_ns=} parse error")
uuid = None
pass
def find_namespace_bdev_and_uuid(self, nqn, nsid):
if nqn not in self.subsystem_nsid_bdev_and_uuid:
return dict(uuid=None, bdev=None)

if uuid:
bdev_name = self.find_unique_bdev_name(uuid)
if nsid not in self.subsystem_nsid_bdev_and_uuid[nqn]:
return dict(uuid=None, bdev=None)

return (found_ns, bdev_name)
return self.subsystem_nsid_bdev_and_uuid[nqn][nsid]

def namespace_resize(self, request, context=None):
def namespace_resize_safe(self, request, context=None):
"""Resize a namespace."""

peer_msg = self.get_peer_message(context)
Expand All @@ -1598,12 +1547,12 @@ def namespace_resize(self, request, context=None):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, True, "Failure resizing namespace")
if not find_ret[0]:
find_ret = self.find_namespace_bdev_and_uuid(request.subsystem_nqn, request.nsid)
if not find_ret["uuid"]:
errmsg = f"Failure resizing namespace {request.nsid} on {request.subsystem_nqn}: Can't find namespace"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret[1]
bdev_name = find_ret["bdev"]
if not bdev_name:
errmsg = f"Failure resizing namespace {request.nsid} on {request.subsystem_nqn}: Can't find associated block device"
self.logger.error(errmsg)
Expand All @@ -1615,10 +1564,17 @@ def namespace_resize(self, request, context=None):
errmsg = os.strerror(0)
else:
errmsg = f"Failure resizing namespace {request.nsid} on {request.subsystem_nqn}: {ret.error_message}"
if ret.status == errno.ENODEV:
self.logger.error(f"Bdev {bdev_name} is not found, will remove namespace {request.nsid} from local list")
self.remove_namespace_bdev_and_uuid_from_list(request.subsystem_nqn, request.nsid)
self.logger.error(errmsg)

return pb2.req_status(status=ret.status, error_message=errmsg)

def namespace_resize(self, request, context=None):
"""Resize a namespace."""
return self.execute_grpc_function(self.namespace_resize_safe, request, context)

def namespace_delete_safe(self, request, context):
"""Delete a namespace."""

Expand All @@ -1635,24 +1591,23 @@ def namespace_delete_safe(self, request, context):
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to delete namespace {request.nsid} from {request.subsystem_nqn}, context: {context}{peer_msg}")

find_ret = self.find_namespace_bdev_and_uuid(request.subsystem_nqn, request.nsid)
if not find_ret["uuid"]:
errmsg = f"Failure deleting namespace: Can't find namespace"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret["bdev"]
if not bdev_name:
self.logger.warning(f"Can't find namespace's bdev name, will try to delete namespace anyway")

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, False, "Failure deleting namespace")
if not find_ret[0]:
errmsg = f"Failure deleting namespace: Can't find namespace"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
bdev_name = find_ret[1]
if not bdev_name:
self.logger.warning(f"Can't find namespace's bdev name, will try to delete namespace anyway")

ns = find_ret[0]
nsid = ns["nsid"]
ret = self.remove_namespace(request.subsystem_nqn, nsid, context)
ret = self.remove_namespace(request.subsystem_nqn, request.nsid, context)
if ret.status != 0:
return ret

self.remove_namespace_from_state(request.subsystem_nqn, nsid, context)
self.remove_namespace_from_state(request.subsystem_nqn, request.nsid, context)
self.remove_namespace_bdev_and_uuid_from_list(request.subsystem_nqn, request.nsid)
if bdev_name:
ret_del = self.delete_bdev(bdev_name, peer_msg = peer_msg)
if ret_del.status != 0:
Expand Down Expand Up @@ -2454,7 +2409,7 @@ def get_subsystems_safe(self, request, context):
for n in s[ns_key]:
nqn = s["nqn"]
nsid = n["nsid"]
bdev = self.subsystem_nsid_bdev[nqn][nsid]
bdev = self.subsystem_nsid_bdev_and_uuid[nqn][nsid]["bdev"]
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
# Parse the JSON dictionary into the protobuf message
Expand Down
Loading

0 comments on commit f4e6f72

Please sign in to comment.