Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure to unlock OMAP file when we're done writing to it #690

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _put_cluster(self, name: str) -> None:
self.logger.info(f"put_cluster {name=} number bdevs: {self.clusters[anagrp][name]}")
return

assert False # we should find the cluster in our state
assert False, f"Cluster {name} is not found" # we should find the cluster in our state

def _alloc_cluster_name(self, anagrp: int) -> str:
"""Allocates a new cluster name for ana group"""
Expand All @@ -253,7 +253,10 @@ def _alloc_cluster(self, anagrp: int) -> str:

def _grpc_function_with_lock(self, func, request, context):
with self.rpc_lock:
return func(request, context)
rc = func(request, context)
if not self.omap_lock.omap_file_disable_unlock:
assert not self.omap_lock.locked(), f"OMAP is still locked when we're out of function {func}"
return rc

def execute_grpc_function(self, func, request, context):
"""This functions handles RPC lock by wrapping 'func' with
Expand All @@ -263,7 +266,7 @@ def execute_grpc_function(self, func, request, context):
"""
return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context)

def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, peer_msg = ""):
def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, block_size, create_image, rbd_image_size, context, peer_msg = ""):
"""Creates a bdev from an RBD image."""

if create_image:
Expand All @@ -273,7 +276,7 @@ def create_bdev(self, anagrp: int, name, uuid, rbd_pool_name, rbd_image_name, bl

self.logger.info(f"Received request to create bdev {name} from"
f" {rbd_pool_name}/{rbd_image_name} (size {rbd_image_size} bytes)"
f" with block size {block_size}, {cr_img_msg}{peer_msg}")
f" with block size {block_size}, {cr_img_msg}, context={context}{peer_msg}")

if block_size == 0:
return BdevStatus(status=errno.EINVAL,
Expand Down Expand Up @@ -410,7 +413,7 @@ def delete_bdev(self, bdev_name, recycling_mode=False, peer_msg=""):

if not self.rpc_lock.locked():
self.logger.error(f"A call to delete_bdev() without holding the RPC lock")
assert self.rpc_lock.locked()
assert self.rpc_lock.locked(), "RPC is unlocked when calling delete_bdev()"

self.logger.info(f"Received request to delete bdev {bdev_name}{peer_msg}")
try:
Expand Down Expand Up @@ -540,7 +543,8 @@ def create_subsystem_safe(self, request, context):
self.logger.info(f"No serial number specified for {request.subsystem_nqn}, will use {request.serial_number}")

ret = False
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
errmsg = ""
try:
subsys_using_serial = None
Expand Down Expand Up @@ -653,7 +657,8 @@ def delete_subsystem_safe(self, request, context):
delete_subsystem_error_prefix = f"Failure deleting subsystem {request.subsystem_nqn}"

ret = False
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
ret = rpc_nvmf.nvmf_delete_subsystem(
self.spdk_rpc_client,
Expand Down Expand Up @@ -744,7 +749,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
"""Adds a namespace to a subsystem."""

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling create_namespace()"
nsid_msg = ""
if nsid and uuid:
nsid_msg = f" using NSID {nsid} and UUID {uuid}"
Expand All @@ -756,7 +761,7 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
add_namespace_error_prefix = f"Failure adding namespace{nsid_msg}to {subsystem_nqn}"

peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}{peer_msg}")
self.logger.info(f"Received request to add {bdev_name} to {subsystem_nqn} with ANA group id {anagrpid}{nsid_msg}, context: {context}{peer_msg}")

if anagrpid > self.subsys_max_ns[subsystem_nqn]:
errmsg = f"{add_namespace_error_prefix}: Group ID {anagrpid} is bigger than configured maximum {self.subsys_max_ns[subsystem_nqn]}"
Expand Down Expand Up @@ -908,14 +913,24 @@ def choose_anagrpid_for_namespace(self, nsid) ->int:
def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

grps_list = []
anagrp = 0
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")
self.logger.info(f"Received request to add a namespace {nsid_msg}to {request.subsystem_nqn}, ana group {request.anagrpid} context: {context}{peer_msg}")

if not request.uuid:
request.uuid = str(uuid.uuid4())

with self.omap_lock(context=context):
if context:
if request.anagrpid != 0:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
else:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
assert anagrp != 0, "Chosen ANA group is 0"

omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if context:
errmsg, ns_nqn = self.check_if_image_used(request.rbd_pool_name, request.rbd_image_name)
if errmsg and ns_nqn:
Expand All @@ -934,24 +949,17 @@ def namespace_add_safe(self, request, context):
else: # new namespace
# If an explicit load balancing group was passed, make sure it exists
if request.anagrpid != 0:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
else:
anagrp = self.choose_anagrpid_for_namespace(request.nsid)
assert anagrp != 0
# if anagrp == 0:
# errmsg = f"Failure adding namespace with automatic ana group load balancing {nsid_msg} to {request.subsystem_nqn}"
# self.logger.error(errmsg)
# return pb2.req_status(status=errno.EINVAL, error_message=errmsg)
request.anagrpid = anagrp

anagrp = request.anagrpid
ret_bdev = self.create_bdev(anagrp, bdev_name, request.uuid, request.rbd_pool_name,
request.rbd_image_name, request.block_size, create_image, request.size, peer_msg)
request.rbd_image_name, request.block_size, create_image, request.size, context, peer_msg)
if ret_bdev.status != 0:
errmsg = f"Failure adding namespace {nsid_msg}to {request.subsystem_nqn}: {ret_bdev.error_message}"
self.logger.error(errmsg)
Expand Down Expand Up @@ -1015,12 +1023,14 @@ def namespace_add(self, request, context=None):
def namespace_change_load_balancing_group_safe(self, request, context):
"""Changes a namespace load balancing group."""

grps_list = []
peer_msg = self.get_peer_message(context)
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to change load balancing group for namespace {nsid_msg}in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

with self.omap_lock(context=context):
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"Failure changing load balancing group for namespace {nsid_msg}in {request.subsystem_nqn}: Load balancing group {request.anagrpid} doesn't exist"
Expand Down Expand Up @@ -1139,7 +1149,7 @@ def remove_namespace_from_state(self, nqn, nsid, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

# If we got here context is not None, so we must hold the OMAP lock
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace_from_state()"

# Update gateway state
try:
Expand All @@ -1160,7 +1170,7 @@ def remove_namespace(self, subsystem_nqn, nsid, context):
"""Removes a namespace from a subsystem."""

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_namespace()"
peer_msg = self.get_peer_message(context)
namespace_failure_prefix = f"Failure removing namespace {nsid} from {subsystem_nqn}"
self.logger.info(f"Received request to remove namespace {nsid} from {subsystem_nqn}{peer_msg}")
Expand Down Expand Up @@ -1469,7 +1479,8 @@ def namespace_set_qos_limits_safe(self, request, context):
limits_to_set = self.get_qos_limits_string(request)
self.logger.debug(f"After merging current QOS limits with previous ones for namespace {nsid_msg}on {request.subsystem_nqn},{limits_to_set}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
ret = rpc_bdev.bdev_set_qos_limit(
self.spdk_rpc_client,
Expand Down Expand Up @@ -1520,7 +1531,7 @@ def find_namespace_and_bdev_name(self, nqn, nsid, uuid, needs_lock, err_prefix):
else:
if not self.rpc_lock.locked():
self.logger.error(f"A call to find_namespace_and_bdev_name() with 'needs_lock' set to False and without holding the RPC lock")
assert self.rpc_lock.locked()
assert self.rpc_lock.locked(), "RPC is unlocked when calling find_namespace_and_bdev_name()"
lock_to_use = contextlib.suppress()

with lock_to_use:
Expand Down Expand Up @@ -1607,7 +1618,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int:
self.logger.info(f"nsid {ns_key} for nqn {subsys} to recycle:")
nsid = ns_key
bdev_name = self.subsystem_nsid_bdev[subsys][nsid]
assert bdev_name
assert bdev_name, f"Can't find bdev for subsystem {subsys}, namespace {nsid}"
ns_params = {'nsid':nsid, 'bdev_name':bdev_name, 'subsys':subsys}
list_ns_params.append(ns_params)
self.logger.info(f"nsid :{nsid}, pool_name: {self.bdev_params[bdev_name]['pool_name']}, rbd_name: {self.bdev_params[bdev_name]['image_name']}, block_size: {self.bdev_params[bdev_name]['block_size']}, uuid:{self.bdev_params[bdev_name]['uuid']}, anagrpid:{ana_id}")
Expand All @@ -1627,7 +1638,7 @@ def namespace_recycle_safe(self, ana_id, peer_msg = "") ->int:
self.logger.info(f"ns params: {ns_params} ")
ret_bdev = self.create_bdev( ana_id, bdev_name, self.bdev_params[bdev_name]['uuid'], self.bdev_params[bdev_name]['pool_name'],
self.bdev_params[bdev_name]['image_name'], self.bdev_params[bdev_name]['block_size'], False,
self.bdev_params[bdev_name]['image_size'], peer_msg)
self.bdev_params[bdev_name]['image_size'], None, peer_msg)
self.logger.info(f"bdev_rbd_create: {bdev_name}")
if ret_bdev.status != 0:
errmsg = f"Failure adding bdev {bdev_name} "
Expand Down Expand Up @@ -1655,7 +1666,8 @@ def namespace_delete_safe(self, request, context):
nsid_msg = self.get_ns_id_message(request.nsid, request.uuid)
self.logger.info(f"Received request to delete namespace {nsid_msg}from {request.subsystem_nqn}, context: {context}{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
find_ret = self.find_namespace_and_bdev_name(request.subsystem_nqn, request.nsid, request.uuid, False,
"Failure deleting namespace")
if not find_ret[0]:
Expand Down Expand Up @@ -1733,7 +1745,8 @@ def add_host_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn)
if host_already_exist:
Expand Down Expand Up @@ -1812,7 +1825,7 @@ def remove_host_from_state(self, subsystem_nqn, host_nqn, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_host_from_state()"
# Update gateway state
try:
self.gateway_state.remove_host(subsystem_nqn, host_nqn)
Expand Down Expand Up @@ -1846,7 +1859,8 @@ def remove_host_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_nqn == "*": # Disable allow any host access
self.logger.info(
Expand Down Expand Up @@ -2105,7 +2119,8 @@ def create_listener_safe(self, request, context):
self.logger.error(f"{errmsg}")
return pb2.req_status(status=errno.EINVAL, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_name == self.host_name:
if (adrfam, request.traddr, request.trsvcid) in self.subsystem_listeners[request.nqn]:
Expand Down Expand Up @@ -2217,7 +2232,7 @@ def remove_listener_from_state(self, nqn, host_name, traddr, port, context):
return pb2.req_status(status=0, error_message=os.strerror(0))

if context:
assert self.omap_lock.locked()
assert self.omap_lock.locked(), "OMAP is unlocked when calling remove_listener_from_state()"

host_name = host_name.strip()
listener_hosts = []
Expand Down Expand Up @@ -2309,7 +2324,8 @@ def delete_listener_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOTEMPTY, error_message=errmsg)

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
if request.host_name == self.host_name or request.force:
ret = rpc_nvmf.nvmf_subsystem_remove_listener(
Expand Down Expand Up @@ -2361,7 +2377,8 @@ def list_listeners_safe(self, request, context):
self.logger.info(f"Received request to list listeners for {request.subsystem}, context: {context}{peer_msg}")

listeners = []
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
state = self.gateway_state.local.get_state()
listener_prefix = GatewayState.build_partial_listener_key(request.subsystem)
for key, val in state.items():
Expand Down Expand Up @@ -2487,7 +2504,8 @@ def get_spdk_nvmf_log_flags_and_level_safe(self, request, context):
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to get SPDK nvmf log flags and level{peer_msg}")
log_flags = []
with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = {key: value for key, value in rpc_log.log_get_flags(
self.spdk_rpc_client).items() if key.startswith('nvmf')}
Expand Down Expand Up @@ -2544,7 +2562,8 @@ def set_spdk_nvmf_logs_safe(self, request, context):

self.logger.info(f"Received request to set SPDK nvmf logs: log_level: {log_level}, print_level: {print_level}{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \
if key.startswith('nvmf')]
Expand Down Expand Up @@ -2592,7 +2611,8 @@ def disable_spdk_nvmf_logs_safe(self, request, context):
peer_msg = self.get_peer_message(context)
self.logger.info(f"Received request to disable SPDK nvmf logs{peer_msg}")

with self.omap_lock(context=context):
omap_lock = self.omap_lock.get_omap_lock_to_use(context)
with omap_lock:
try:
nvmf_log_flags = [key for key in rpc_log.log_get_flags(self.spdk_rpc_client).keys() \
if key.startswith('nvmf')]
Expand Down
Loading
Loading