diff --git a/control/cli.py b/control/cli.py index 873432dcc..e9e086b0c 100644 --- a/control/cli.py +++ b/control/cli.py @@ -226,7 +226,7 @@ def add_namespace(self, args): def remove_namespace(self, args): """Removes a namespace from a subsystem.""" req = pb2.remove_namespace_req(subsystem_nqn=args.subnqn, - nsid=args.nsid, already_locked=False) + nsid=args.nsid) ret = self.stub.remove_namespace(req) self.logger.info( f"Removed namespace {args.nsid} from {args.subnqn}:" diff --git a/control/grpc.py b/control/grpc.py index 6373333fa..e1876e2ba 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -126,46 +126,49 @@ def _alloc_cluster(self) -> str: ) return name - def create_bdev(self, request, context=None): + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" - with self.rpc_lock: - if not request.uuid: - request.uuid = str(uuid.uuid4()) + if not request.uuid: + request.uuid = str(uuid.uuid4()) - name = request.uuid if not request.bdev_name else request.bdev_name - self.logger.info(f"Received request to create bdev {name} from" - f" {request.rbd_pool_name}/{request.rbd_image_name}" - f" with block size {request.block_size}") + name = request.uuid if not request.bdev_name else request.bdev_name + self.logger.info(f"Received request to create bdev {name} from" + f" {request.rbd_pool_name}/{request.rbd_image_name}" + f" with block size {request.block_size}") + try: + bdev_name = rpc_bdev.bdev_rbd_create( + self.spdk_rpc_client, + name=name, + cluster_name=self._get_cluster(), + pool_name=request.rbd_pool_name, + rbd_name=request.rbd_image_name, + block_size=request.block_size, + uuid=request.uuid, + ) + self.logger.info(f"create_bdev: {bdev_name}") + except Exception as ex: + self.logger.error(f"create_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.bdev() + + if context: + # Update gateway state try: - bdev_name = rpc_bdev.bdev_rbd_create( - self.spdk_rpc_client, - name=name, - cluster_name=self._get_cluster(), - pool_name=request.rbd_pool_name, - rbd_name=request.rbd_image_name, - block_size=request.block_size, - uuid=request.uuid, - ) - self.logger.info(f"create_bdev: {bdev_name}") + json_req = json_format.MessageToJson(request, preserving_proto_field_name=True) + self.gateway_state.add_bdev(bdev_name, json_req) except Exception as ex: - self.logger.error(f"create_bdev failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.bdev() + self.logger.error( + f"Error persisting create_bdev {bdev_name}: {ex}") + raise - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson(request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) - except Exception as ex: - self.logger.error( - f"Error persisting create_bdev {bdev_name}: {ex}") - raise + return pb2.bdev(bdev_name=bdev_name, status=True) - return pb2.bdev(bdev_name=bdev_name, status=True) + def create_bdev(self, request, context=None): + with self.rpc_lock: + return self.create_bdev_safe(request, context) def find_bdev_namespaces(self, bdev_name): ns_list = [] @@ -193,385 +196,402 @@ def delete_bdev_handle_exception(self, context, ex): context.set_details(f"{ex}") return pb2.req_status() - def delete_bdev(self, request, context=None): + def delete_bdev_safe(self, request, context=None): """Deletes a bdev.""" - with self.rpc_lock: - self.logger.info(f"Received request to delete bdev {request.bdev_name}") - ns_list = self.find_bdev_namespaces(request.bdev_name) - for namespace in ns_list: - # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. - # Otherwise fail with EBUSY + self.logger.info(f"Received request to delete bdev {request.bdev_name}") + ns_list = self.find_bdev_namespaces(request.bdev_name) + for namespace in ns_list: + # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. + # Otherwise fail with EBUSY + try: + ns_nsid = namespace["nsid"] + ns_nqn = namespace["nqn"] + except Exception as ex: + self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") + continue + + if request.force: + self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") try: - ns_nsid = namespace["nsid"] - ns_nqn = namespace["nqn"] + req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=ns_nqn, nsid=ns_nsid) + # We already hold the lock, so call the safe version, do not try lock again + ret = self.remove_namespace_safe(req_rm_ns, context) + self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}: {ret.status}") except Exception as ex: - self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") - continue - - if request.force: - self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") - try: - req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=ns_nqn, nsid=ns_nsid, already_locked=True) - # We already hold the lock, so call the safe version, do not try lock again - ret = self.remove_namespace(req_rm_ns, context) - self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}: {ret.status}") - except Exception as ex: - self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", "response:", json.dumps(ret, indent=2)]) - return self.delete_bdev_handle_exception(context, Exception(msg)) + self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", "response:", json.dumps(ret, indent=2)]) + return self.delete_bdev_handle_exception(context, Exception(msg)) + + try: + ret = rpc_bdev.bdev_rbd_delete( + self.spdk_rpc_client, + request.bdev_name, + ) + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") + except Exception as ex: + return self.delete_bdev_handle_exception(context, ex) + if context: + # Update gateway state try: - ret = rpc_bdev.bdev_rbd_delete( - self.spdk_rpc_client, - request.bdev_name, - ) - self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") + self.gateway_state.remove_bdev(request.bdev_name) except Exception as ex: - return self.delete_bdev_handle_exception(context, ex) + self.logger.error(f"Error persisting delete_bdev {request.bdev_name}: {ex}") + raise - if context: - # Update gateway state - try: - self.gateway_state.remove_bdev(request.bdev_name) - except Exception as ex: - self.logger.error( - f"Error persisting delete_bdev {request.bdev_name}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def delete_bdev(self, request, context=None): + with self.rpc_lock: + return self.delete_bdev_safe(request, context) - def create_subsystem(self, request, context=None): + def create_subsystem_safe(self, request, context=None): """Creates a subsystem.""" - with self.rpc_lock: - self.logger.info(f"Received request to create subsystem {request.subsystem_nqn}") - min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1) - max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519) - if not request.serial_number: - random.seed() - randser = random.randint(2, 99999999999999) - request.serial_number = f"SPDK{randser}" + self.logger.info(f"Received request to create subsystem {request.subsystem_nqn}") + min_cntlid = self.config.getint_with_default("gateway", "min_controller_id", 1) + max_cntlid = self.config.getint_with_default("gateway", "max_controller_id", 65519) + if not request.serial_number: + random.seed() + randser = random.randint(2, 99999999999999) + request.serial_number = f"SPDK{randser}" + try: + ret = rpc_nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + serial_number=request.serial_number, + max_namespaces=request.max_namespaces, + min_cntlid=min_cntlid, + max_cntlid=max_cntlid, + ) + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") + except Exception as ex: + self.logger.error(f"create_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - ret = rpc_nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - serial_number=request.serial_number, - max_namespaces=request.max_namespaces, - min_cntlid=min_cntlid, - max_cntlid=max_cntlid, - ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") + json_req = json_format.MessageToJson(request, preserving_proto_field_name=True) + self.gateway_state.add_subsystem(request.subsystem_nqn, json_req) except Exception as ex: - self.logger.error(f"create_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting create_subsystem {request.subsystem_nqn}: {ex}") + raise - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson(request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, json_req) - except Exception as ex: - self.logger.error(f"Error persisting create_subsystem {request.subsystem_nqn}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def create_subsystem(self, request, context=None): + with self.rpc_lock: + return self.create_subsystem_safe(request, context) - def delete_subsystem(self, request, context=None): + def delete_subsystem_safe(self, request, context=None): """Deletes a subsystem.""" - with self.rpc_lock: - self.logger.info(f"Received request to delete subsystem {request.subsystem_nqn}") + self.logger.info(f"Received request to delete subsystem {request.subsystem_nqn}") + try: + ret = rpc_nvmf.nvmf_delete_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + ) + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") + except Exception as ex: + self.logger.error(f"delete_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - ret = rpc_nvmf.nvmf_delete_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - ) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") + self.gateway_state.remove_subsystem(request.subsystem_nqn) except Exception as ex: - self.logger.error(f"delete_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting delete_subsystem {request.subsystem_nqn}: {ex}") + raise - if context: - # Update gateway state - try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) - except Exception as ex: - self.logger.error(f"Error persisting delete_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def delete_subsystem(self, request, context=None): + with self.rpc_lock: + return self.delete_subsystem_safe(request, context) - def add_namespace(self, request, context=None): + def add_namespace_safe(self, request, context=None): """Adds a namespace to a subsystem.""" - with self.rpc_lock: - self.logger.info(f"Received request to add {request.bdev_name} to {request.subsystem_nqn}") + self.logger.info(f"Received request to add {request.bdev_name} to {request.subsystem_nqn}") + try: + nsid = rpc_nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name, + nsid=request.nsid, + ) + self.logger.info(f"add_namespace: {nsid}") + except Exception as ex: + self.logger.error(f"add_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.nsid() + + if context: + # Update gateway state try: - nsid = rpc_nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - bdev_name=request.bdev_name, - nsid=request.nsid, - ) - self.logger.info(f"add_namespace: {nsid}") + if not request.nsid: + request.nsid = nsid + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_namespace(request.subsystem_nqn, str(nsid), json_req) except Exception as ex: - self.logger.error(f"add_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.nsid() + self.logger.error(f"Error persisting add_namespace {nsid}: {ex}") + raise - if context: - # Update gateway state - try: - if not request.nsid: - request.nsid = nsid - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, str(nsid), json_req) - except Exception as ex: - self.logger.error(f"Error persisting add_namespace {nsid}: {ex}") - raise + return pb2.nsid(nsid=nsid, status=True) - return pb2.nsid(nsid=nsid, status=True) + def add_namespace(self, request, context=None): + with self.rpc_lock: + return self.add_namespace_safe(request, context) - def remove_namespace(self, request, context=None): + def remove_namespace_safe(self, request, context=None): """Removes a namespace from a subsystem.""" - # We can get here eihter directly or from within delete_bdev. In the latter case we already hold the RPC lock - # so just use a dummy lock which does nothing - if request.already_locked: - lock_to_use = contextlib.suppress() - else: - lock_to_use = self.rpc_lock + self.logger.info(f"Received request to remove {request.nsid} from {request.subsystem_nqn}") + try: + ret = rpc_nvmf.nvmf_subsystem_remove_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") + except Exception as ex: + self.logger.error(f"remove_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() - with lock_to_use: - self.logger.info(f"Received request to remove {request.nsid} from {request.subsystem_nqn}") + if context: + # Update gateway state try: - ret = rpc_nvmf.nvmf_subsystem_remove_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - nsid=request.nsid, - ) - self.logger.info(f"remove_namespace {request.nsid}: {ret}") + self.gateway_state.remove_namespace(request.subsystem_nqn, + str(request.nsid)) except Exception as ex: - self.logger.error(f"remove_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting remove_namespace {request.nsid}: {ex}") + raise - if context: - # Update gateway state - try: - self.gateway_state.remove_namespace(request.subsystem_nqn, - str(request.nsid)) - except Exception as ex: - self.logger.error(f"Error persisting remove_namespace {request.nsid}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def remove_namespace(self, request, context=None): + with self.rpc_lock: + return self.remove_namespace_safe(request, context) - def add_host(self, request, context=None): + def add_host_safe(self, request, context=None): """Adds a host to a subsystem.""" - with self.rpc_lock: + try: + if request.host_nqn == "*": # Allow any host access to subsystem + self.logger.info(f"Received request to allow any host to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=False, + ) + self.logger.info(f"add_host *: {ret}") + else: # Allow single host access to subsystem + self.logger.info( + f"Received request to add host {request.host_nqn} to" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") + except Exception as ex: + self.logger.error(f"add_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=False, - ) - self.logger.info(f"add_host *: {ret}") - else: # Allow single host access to subsystem - self.logger.info( - f"Received request to add host {request.host_nqn} to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_host(request.subsystem_nqn, + request.host_nqn, json_req) except Exception as ex: - self.logger.error(f"add_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting add_host {request.host_nqn}: {ex}") + raise - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, - request.host_nqn, json_req) - except Exception as ex: - self.logger.error(f"Error persisting add_host {request.host_nqn}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def add_host(self, request, context=None): + with self.rpc_lock: + return self.add_host_safe(request, context) - def remove_host(self, request, context=None): + def remove_host_safe(self, request, context=None): """Removes a host from a subsystem.""" - with self.rpc_lock: + try: + if request.host_nqn == "*": # Disable allow any host access + self.logger.info(f"Received request to disable any host access to {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=True, + ) + self.logger.info(f"remove_host *: {ret}") + else: # Remove single host access to subsystem + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + f" {request.subsystem_nqn}") + ret = rpc_nvmf.nvmf_subsystem_remove_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") + except Exception as ex: + self.logger.error(f"remove_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - if request.host_nqn == "*": # Disable allow any host access - self.logger.info( - f"Received request to disable any host access to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=True, - ) - self.logger.info(f"remove_host *: {ret}") - else: # Remove single host access to subsystem - self.logger.info( - f"Received request to remove host_{request.host_nqn} from" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_remove_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") + self.gateway_state.remove_host(request.subsystem_nqn, + request.host_nqn) except Exception as ex: - self.logger.error(f"remove_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting remove_host: {ex}") + raise - if context: - # Update gateway state - try: - self.gateway_state.remove_host(request.subsystem_nqn, - request.host_nqn) - except Exception as ex: - self.logger.error(f"Error persisting remove_host: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def remove_host(self, request, context=None): + with self.rpc_lock: + return self.remove_host_safe(request, context) - def create_listener(self, request, context=None): + def create_listener_safe(self, request, context=None): """Creates a listener for a subsystem at a given IP/Port.""" - with self.rpc_lock: - ret = True - self.logger.info(f"Received request to create {request.gateway_name}" - f" {request.trtype} listener for {request.nqn} at" - f" {request.traddr}:{request.trsvcid}.") + ret = True + self.logger.info(f"Received request to create {request.gateway_name}" + f" {request.trtype} listener for {request.nqn} at" + f" {request.traddr}:{request.trsvcid}.") + try: + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_add_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"create_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway ({self.gateway_name})") + except Exception as ex: + self.logger.error(f"create_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"create_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_listener(request.nqn, + request.gateway_name, + request.trtype, request.traddr, + request.trsvcid, json_req) except Exception as ex: - self.logger.error(f"create_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting add_listener {request.trsvcid}: {ex}") + raise - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, - request.gateway_name, - request.trtype, request.traddr, - request.trsvcid, json_req) - except Exception as ex: - self.logger.error(f"Error persisting add_listener {request.trsvcid}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def create_listener(self, request, context=None): + with self.rpc_lock: + return self.create_listener_safe(request, context) - def delete_listener(self, request, context=None): + def delete_listener_safe(self, request, context=None): """Deletes a listener from a subsystem at a given IP/Port.""" - with self.rpc_lock: - ret = True - self.logger.info(f"Received request to delete {request.gateway_name}" - f" {request.trtype} listener for {request.nqn} at" - f" {request.traddr}:{request.trsvcid}.") + ret = True + self.logger.info(f"Received request to delete {request.gateway_name}" + f" {request.trtype} listener for {request.nqn} at" + f" {request.traddr}:{request.trsvcid}.") + try: + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_remove_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"delete_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway ({self.gateway_name})") + except Exception as ex: + self.logger.error(f"delete_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_remove_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"delete_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") + self.gateway_state.remove_listener(request.nqn, + request.gateway_name, + request.trtype, + request.traddr, + request.trsvcid) except Exception as ex: - self.logger.error(f"delete_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() + self.logger.error(f"Error persisting delete_listener {request.trsvcid}: {ex}") + raise - if context: - # Update gateway state - try: - self.gateway_state.remove_listener(request.nqn, - request.gateway_name, - request.trtype, - request.traddr, - request.trsvcid) - except Exception as ex: - self.logger.error(f"Error persisting delete_listener {request.trsvcid}: {ex}") - raise + return pb2.req_status(status=ret) - return pb2.req_status(status=ret) + def delete_listener(self, request, context=None): + with self.rpc_lock: + return self.delete_listener_safe(request, context) - def get_subsystems(self, request, context): + def get_subsystems_safe(self, request, context): """Gets subsystems.""" - with self.rpc_lock: - self.logger.info(f"Received request to get subsystems") - try: - ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) - self.logger.info(f"get_subsystems: {ret}") - except Exception as ex: - self.logger.error(f"get_subsystems failed with: \n {ex}") - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.subsystems_info() + self.logger.info(f"Received request to get subsystems") + try: + ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) + self.logger.info(f"get_subsystems: {ret}") + except Exception as ex: + self.logger.error(f"get_subsystems failed with: \n {ex}") + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.subsystems_info() + + return pb2.subsystems_info(subsystems=json.dumps(ret)) - return pb2.subsystems_info(subsystems=json.dumps(ret)) + def get_subsystems(self, request, context): + with self.rpc_lock: + return self.get_subsystems_safe(request, context) diff --git a/control/proto/gateway.proto b/control/proto/gateway.proto index e7e89a690..c361e3e16 100644 --- a/control/proto/gateway.proto +++ b/control/proto/gateway.proto @@ -79,7 +79,6 @@ message add_namespace_req { message remove_namespace_req { string subsystem_nqn = 1; int32 nsid = 2; - optional bool already_locked = 3; } message add_host_req {