From 1082f0d3460002b4e86bbf26915eb10f43d726f5 Mon Sep 17 00:00:00 2001 From: Gil Bregman Date: Tue, 10 Oct 2023 09:08:10 -0400 Subject: [PATCH] Add a lock for GRPC calls to prevent corruption and exceptions on gateway restart. Fixes #255 Signed-off-by: Gil Bregman --- control/grpc.py | 97 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 13 deletions(-) diff --git a/control/grpc.py b/control/grpc.py index f85c39b8e..1c38b8222 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -14,6 +14,7 @@ import random import logging import os +import threading import spdk.rpc.bdev as rpc_bdev import spdk.rpc.nvmf as rpc_nvmf @@ -36,6 +37,29 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ + class RPCGuard: + RPC_GUARD_LOCK_TIMEOUT = 300 + + def __init__(self, logger, timeout = None) -> None: + self.rpc_lock = threading.Lock() + self.lock_timeout = timeout if timeout != None else self.RPC_GUARD_LOCK_TIMEOUT + self.logger = logger + + def __enter__(self): + rc = self.rpc_lock.acquire(True, self.lock_timeout) + if not rc: + self.logger.warning(f"Couldn't acquire lock after {self.lock_timeout} seconds, will try again") + rc = self.rpc_lock.acquire(True, self.lock_timeout) + if not rc: + self.logger.error(f"Failed to acquire lock for guarding RPC, will continue anyway") + return self + + def __exit__(self, typ, value, traceback): + if self.rpc_lock.locked(): + self.rpc_lock.release() + else: + self.logger.warning(f"Asked to release an unlocked RPC guard, ignore") + def __init__(self, config, gateway_state, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) @@ -44,6 +68,7 @@ def __init__(self, config, gateway_state, spdk_rpc_client) -> None: self.logger.info(f"Using NVMeoF gateway version {ver}") self.config = config self.logger.info(f"Using configuration file {config.filepath}") + self.rpc_lock = GatewayService.RPCGuard(self.logger) self.gateway_state = gateway_state self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") @@ -91,7 +116,7 @@ def _alloc_cluster(self) -> str: ) return name - def create_bdev(self, request, context=None): + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" if not request.uuid: @@ -132,13 +157,18 @@ def create_bdev(self, request, context=None): return pb2.bdev(bdev_name=bdev_name, status=True) - def delete_bdev(self, request, context=None): + def create_bdev(self, request, context=None): + with self.rpc_lock: + return self.create_bdev_safe(request, context) + + def delete_bdev_safe(self, request, context=None): """Deletes a bdev.""" self.logger.info(f"Received request to delete bdev {request.bdev_name}") use_excep = None req_get_subsystems = pb2.get_subsystems_req() - ret = self.get_subsystems(req_get_subsystems, context) + # We already hold the lock, so call the safe version, do not try lock again + ret = self.get_subsystems_safe(req_get_subsystems, context) subsystems = json.loads(ret.subsystems) for subsystem in subsystems: for namespace in subsystem['namespaces']: @@ -149,7 +179,8 @@ def delete_bdev(self, request, context=None): self.logger.info(f"Will remove namespace {namespace['nsid']} from {subsystem['nqn']} as it is using bdev {request.bdev_name}") try: req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=subsystem['nqn'], nsid=namespace['nsid']) - ret = self.remove_namespace(req_rm_ns, context) + # We already hold the lock, so call the safe version, do not try lock again + ret = self.remove_namespace_safe(req_rm_ns, context) self.logger.info( f"Removed namespace {namespace['nsid']} from {subsystem['nqn']}: {ret.status}") except Exception as ex: @@ -191,7 +222,11 @@ def delete_bdev(self, request, context=None): return pb2.req_status(status=ret) - def create_subsystem(self, request, context=None): + def delete_bdev(self, request, context=None): + with self.rpc_lock: + return self.delete_bdev_safe(request, context) + + def create_subsystem_safe(self, request, context=None): """Creates a subsystem.""" self.logger.info( @@ -233,7 +268,11 @@ def create_subsystem(self, request, context=None): return pb2.req_status(status=ret) - def delete_subsystem(self, request, context=None): + def create_subsystem(self, request, context=None): + with self.rpc_lock: + return self.create_subsystem_safe(request, context) + + def delete_subsystem_safe(self, request, context=None): """Deletes a subsystem.""" self.logger.info( @@ -262,7 +301,11 @@ def delete_subsystem(self, request, context=None): return pb2.req_status(status=ret) - def add_namespace(self, request, context=None): + def delete_subsystem(self, request, context=None): + with self.rpc_lock: + return self.delete_subsystem_safe(request, context) + + def add_namespace_safe(self, request, context=None): """Adds a namespace to a subsystem.""" self.logger.info(f"Received request to add {request.bdev_name} to" @@ -298,7 +341,11 @@ def add_namespace(self, request, context=None): return pb2.nsid(nsid=nsid, status=True) - def remove_namespace(self, request, context=None): + def add_namespace(self, request, context=None): + with self.rpc_lock: + return self.add_namespace_safe(request, context) + + def remove_namespace_safe(self, request, context=None): """Removes a namespace from a subsystem.""" self.logger.info(f"Received request to remove {request.nsid} from" @@ -329,7 +376,11 @@ def remove_namespace(self, request, context=None): return pb2.req_status(status=ret) - def add_host(self, request, context=None): + def remove_namespace(self, request, context=None): + with self.rpc_lock: + return self.remove_namespace_safe(request, context) + + def add_host_safe(self, request, context=None): """Adds a host to a subsystem.""" try: @@ -373,7 +424,11 @@ def add_host(self, request, context=None): return pb2.req_status(status=ret) - def remove_host(self, request, context=None): + def add_host(self, request, context=None): + with self.rpc_lock: + return self.add_host_safe(request, context) + + def remove_host_safe(self, request, context=None): """Removes a host from a subsystem.""" try: @@ -415,7 +470,11 @@ def remove_host(self, request, context=None): return pb2.req_status(status=ret) - def create_listener(self, request, context=None): + def remove_host(self, request, context=None): + with self.rpc_lock: + return self.remove_host_safe(request, context) + + def create_listener_safe(self, request, context=None): """Creates a listener for a subsystem at a given IP/Port.""" ret = True @@ -459,7 +518,11 @@ def create_listener(self, request, context=None): return pb2.req_status(status=ret) - def delete_listener(self, request, context=None): + def create_listener(self, request, context=None): + with self.rpc_lock: + return self.create_listener_safe(request, context) + + def delete_listener_safe(self, request, context=None): """Deletes a listener from a subsystem at a given IP/Port.""" ret = True @@ -502,7 +565,11 @@ def delete_listener(self, request, context=None): return pb2.req_status(status=ret) - def get_subsystems(self, request, context): + def delete_listener(self, request, context=None): + with self.rpc_lock: + return self.delete_listener_safe(request, context) + + def get_subsystems_safe(self, request, context): """Gets subsystems.""" self.logger.info(f"Received request to get subsystems") @@ -516,3 +583,7 @@ def get_subsystems(self, request, context): return pb2.subsystems_info() return pb2.subsystems_info(subsystems=json.dumps(ret)) + + def get_subsystems(self, request, context): + with self.rpc_lock: + return self.get_subsystems_safe(request, context)