diff --git a/.github/workflows/build-container.yml b/.github/workflows/build-container.yml index 296d2c82..4aaa1949 100644 --- a/.github/workflows/build-container.yml +++ b/.github/workflows/build-container.yml @@ -65,7 +65,7 @@ jobs: strategy: fail-fast: false matrix: - test: ["cli", "state", "multi_gateway", "server", "grpc"] + test: ["cli", "state", "multi_gateway", "server", "grpc", "omap_lock"] runs-on: ubuntu-latest env: HUGEPAGES: 512 # for multi gateway test, approx 256 per gateway instance diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index 0c54d561..07666a58 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -18,6 +18,10 @@ state_update_interval_sec = 5 #min_controller_id = 1 #max_controller_id = 65519 enable_spdk_discovery_controller = False +#omap_file_lock_duration = 60 +#omap_file_lock_retries = 15 +#omap_file_lock_retry_sleep_interval = 5 +#omap_file_update_reloads = 10 [discovery] addr = 0.0.0.0 diff --git a/control/grpc.py b/control/grpc.py index 782089dc..b0e808b5 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -42,7 +42,7 @@ class GatewayService(pb2_grpc.GatewayServicer): spdk_rpc_client: Client of SPDK RPC server """ - def __init__(self, config, gateway_state, spdk_rpc_client) -> None: + def __init__(self, config, gateway_state, omap_lock, spdk_rpc_client) -> None: """Constructor""" self.logger = logging.getLogger(__name__) ver = os.getenv("NVMEOF_VERSION") @@ -73,6 +73,7 @@ def __init__(self, config, gateway_state, spdk_rpc_client) -> None: config.dump_config_file(self.logger) self.rpc_lock = threading.Lock() self.gateway_state = gateway_state + self.omap_lock = omap_lock self.spdk_rpc_client = spdk_rpc_client self.gateway_name = self.config.get("gateway", "name") if not self.gateway_name: @@ -119,6 +120,13 @@ def _alloc_cluster(self) -> str: ) return name + def _grpc_function_with_lock(self, func, request, context): + with self.rpc_lock: + return func(request, context) + + def execute_grpc_function(self, func, request, context): + return self.omap_lock.execute_omap_locking_function(self._grpc_function_with_lock, func, request, context) + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" @@ -128,41 +136,41 @@ def create_bdev_safe(self, request, context=None): name = request.uuid if not request.bdev_name else request.bdev_name self.logger.info(f"Received request to create bdev {name} from" f" {request.rbd_pool_name}/{request.rbd_image_name}" - f" with block size {request.block_size}") - try: - bdev_name = rpc_bdev.bdev_rbd_create( - self.spdk_rpc_client, - name=name, - cluster_name=self._get_cluster(), - pool_name=request.rbd_pool_name, - rbd_name=request.rbd_image_name, - block_size=request.block_size, - uuid=request.uuid, - ) - self.logger.info(f"create_bdev: {bdev_name}") - except Exception as ex: - self.logger.error(f"create_bdev failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.bdev() - - if context: - # Update gateway state + f" with block size {request.block_size}, context: {context}") + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_bdev(bdev_name, json_req) + bdev_name = rpc_bdev.bdev_rbd_create( + self.spdk_rpc_client, + name=name, + cluster_name=self._get_cluster(), + pool_name=request.rbd_pool_name, + rbd_name=request.rbd_image_name, + block_size=request.block_size, + uuid=request.uuid, + ) + self.logger.info(f"create_bdev: {bdev_name}") except Exception as ex: - self.logger.error( - f"Error persisting create_bdev {bdev_name}: {ex}") - raise + self.logger.error(f"create_bdev failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.bdev() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_bdev(bdev_name, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting create_bdev {bdev_name}: {ex}") + raise return pb2.bdev(bdev_name=bdev_name, status=True) def create_bdev(self, request, context=None): - with self.rpc_lock: - return self.create_bdev_safe(request, context) + return self.execute_grpc_function(self.create_bdev_safe, request, context) def get_bdev_namespaces(self, bdev_name) -> list: ns_list = [] @@ -192,59 +200,59 @@ def delete_bdev_handle_exception(self, context, ex): def delete_bdev_safe(self, request, context=None): """Deletes a bdev.""" - self.logger.info(f"Received request to delete bdev {request.bdev_name}") + self.logger.info(f"Received request to delete bdev {request.bdev_name}, context: {context}") ns_list = [] - if context: - ns_list = self.get_bdev_namespaces(request.bdev_name) - for namespace in ns_list: - # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. - # Otherwise fail with EBUSY - try: - ns_nsid = namespace["nsid"] - ns_nqn = namespace["nqn"] - except Exception as ex: - self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") - continue - - if request.force: - self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") + with self.omap_lock(context=context): + if context: + ns_list = self.get_bdev_namespaces(request.bdev_name) + for namespace in ns_list: + # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. + # Otherwise fail with EBUSY try: - self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) - self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") + ns_nsid = namespace["nsid"] + ns_nqn = namespace["nqn"] except Exception as ex: - self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") - pass - else: - self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") - req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} - ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent = 2), - "Got JSON-RPC error response", "response:", json.dumps(ret, indent = 2)]) - return self.delete_bdev_handle_exception(context, Exception(msg)) - - try: - ret = rpc_bdev.bdev_rbd_delete( - self.spdk_rpc_client, - request.bdev_name, - ) - self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") - except Exception as ex: - return self.delete_bdev_handle_exception(context, ex) + self.logger.error(f"Got exception while trying to remove namespace: {namespace} which stil uses bdev {request.bdev_name}: {ex}") + continue + + if request.force: + self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") + try: + self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) + self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") + except Exception as ex: + self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") + pass + else: + self.logger.error(f"Namespace {ns_nsid} from {ns_nqn} is still using bdev {request.bdev_name}. You need to either remove it or use the '--force' command line option") + req = {"name": request.bdev_name, "method": "bdev_rbd_delete", "req_id": 0} + ret = {"code": -errno.EBUSY, "message": os.strerror(errno.EBUSY)} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent = 2), + "Got JSON-RPC error response", "response:", json.dumps(ret, indent = 2)]) + return self.delete_bdev_handle_exception(context, Exception(msg)) - if context: - # Update gateway state try: - self.gateway_state.remove_bdev(request.bdev_name) + ret = rpc_bdev.bdev_rbd_delete( + self.spdk_rpc_client, + request.bdev_name, + ) + self.logger.info(f"delete_bdev {request.bdev_name}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting delete_bdev {request.bdev_name}: {ex}") - raise + return self.delete_bdev_handle_exception(context, ex) + + if context: + # Update gateway state + try: + self.gateway_state.remove_bdev(request.bdev_name) + except Exception as ex: + self.logger.error( + f"Error persisting delete_bdev {request.bdev_name}: {ex}") + raise return pb2.req_status(status=ret) def delete_bdev(self, request, context=None): - with self.rpc_lock: - return self.delete_bdev_safe(request, context) + return self.execute_grpc_function(self.delete_bdev_safe, request, context) def is_discovery_nqn(self, nqn) -> bool: return nqn == DiscoveryService.DISCOVERY_NQN @@ -270,7 +278,7 @@ def create_subsystem_safe(self, request, context=None): """Creates a subsystem.""" self.logger.info( - f"Received request to create subsystem {request.subsystem_nqn}, ana reporting: {request.ana_reporting} ") + f"Received request to create subsystem {request.subsystem_nqn}, ana reporting: {request.ana_reporting}, context: {context}") if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't create a discovery subsystem") @@ -283,99 +291,99 @@ def create_subsystem_safe(self, request, context=None): request.serial_number = f"SPDK{randser}" self.logger.info(f"No serial number specified, will use {request.serial_number}") - try: - subsys_using_serial = self.serial_number_already_used(context, request.serial_number) - if subsys_using_serial: - self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}") - req = {"subsystem_nqn": request.subsystem_nqn, - "serial_number": request.serial_number, - "max_namespaces": request.max_namespaces, - "ana_reporting": request.ana_reporting, - "enable_ha": request.enable_ha, - "method": "nvmf_create_subsystem", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - ret = rpc_nvmf.nvmf_create_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - serial_number=request.serial_number, - max_namespaces=request.max_namespaces, - min_cntlid=min_cntlid, - max_cntlid=max_cntlid, - ana_reporting = request.ana_reporting, - ) - self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"create_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_subsystem(request.subsystem_nqn, - json_req) + subsys_using_serial = self.serial_number_already_used(context, request.serial_number) + if subsys_using_serial: + self.logger.error(f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}") + req = {"subsystem_nqn": request.subsystem_nqn, + "serial_number": request.serial_number, + "max_namespaces": request.max_namespaces, + "ana_reporting": request.ana_reporting, + "enable_ha": request.enable_ha, + "method": "nvmf_create_subsystem", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Serial number {request.serial_number} already used by subsystem {subsys_using_serial}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_create_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + serial_number=request.serial_number, + max_namespaces=request.max_namespaces, + min_cntlid=min_cntlid, + max_cntlid=max_cntlid, + ana_reporting = request.ana_reporting, + ) + self.logger.info(f"create_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting create_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"create_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_subsystem(request.subsystem_nqn, + json_req) + except Exception as ex: + self.logger.error(f"Error persisting create_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) def create_subsystem(self, request, context=None): - with self.rpc_lock: - return self.create_subsystem_safe(request, context) + return self.execute_grpc_function(self.create_subsystem_safe, request, context) def delete_subsystem_safe(self, request, context=None): """Deletes a subsystem.""" self.logger.info( - f"Received request to delete subsystem {request.subsystem_nqn}") + f"Received request to delete subsystem {request.subsystem_nqn}, context: {context}") if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't delete a discovery subsystem") - try: - ret = rpc_nvmf.nvmf_delete_subsystem( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - ) - self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"delete_subsystem failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_subsystem(request.subsystem_nqn) + ret = rpc_nvmf.nvmf_delete_subsystem( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + ) + self.logger.info(f"delete_subsystem {request.subsystem_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting delete_subsystem" - f" {request.subsystem_nqn}: {ex}") - raise + self.logger.error(f"delete_subsystem failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_subsystem(request.subsystem_nqn) + except Exception as ex: + self.logger.error(f"Error persisting delete_subsystem" + f" {request.subsystem_nqn}: {ex}") + raise return pb2.req_status(status=ret) def delete_subsystem(self, request, context=None): - with self.rpc_lock: - return self.delete_subsystem_safe(request, context) + return self.execute_grpc_function(self.delete_subsystem_safe, request, context) def add_namespace_safe(self, request, context=None): """Adds a namespace to a subsystem.""" self.logger.info(f"Received request to add {request.bdev_name} to" - f" {request.subsystem_nqn}") + f" {request.subsystem_nqn}, context: {context}") if request.anagrpid > MAX_ANA_GROUPS: raise Exception(f"Error group ID {request.anagrpid} is more than configured maximum {MAX_ANA_GROUPS}") @@ -383,80 +391,80 @@ def add_namespace_safe(self, request, context=None): if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't add a namespace to a discovery subsystem") - try: - nsid = rpc_nvmf.nvmf_subsystem_add_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - bdev_name=request.bdev_name, - nsid=request.nsid, - anagrpid=request.anagrpid, - ) - self.logger.info(f"add_namespace: {nsid}") - except Exception as ex: - self.logger.error(f"add_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.nsid() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - if not request.nsid: - request.nsid = nsid - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_namespace(request.subsystem_nqn, - str(nsid), json_req) + nsid = rpc_nvmf.nvmf_subsystem_add_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + bdev_name=request.bdev_name, + nsid=request.nsid, + anagrpid=request.anagrpid, + ) + self.logger.info(f"add_namespace: {nsid}") except Exception as ex: - self.logger.error( - f"Error persisting add_namespace {nsid}: {ex}") - raise + self.logger.error(f"add_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.nsid() + + if context: + # Update gateway state + try: + if not request.nsid: + request.nsid = nsid + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_namespace(request.subsystem_nqn, + str(nsid), json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_namespace {nsid}: {ex}") + raise return pb2.nsid(nsid=nsid, status=True) def add_namespace(self, request, context=None): - with self.rpc_lock: - return self.add_namespace_safe(request, context) + return self.execute_grpc_function(self.add_namespace_safe, request, context) def remove_namespace_safe(self, request, context=None): """Removes a namespace from a subsystem.""" self.logger.info(f"Received request to remove nsid {request.nsid} from" - f" {request.subsystem_nqn}") + f" {request.subsystem_nqn}, context: {context}") if self.is_discovery_nqn(request.subsystem_nqn): raise Exception(f"Can't remove a namespace from a discovery subsystem") - try: - ret = rpc_nvmf.nvmf_subsystem_remove_ns( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - nsid=request.nsid, - ) - self.logger.info(f"remove_namespace {request.nsid}: {ret}") - except Exception as ex: - self.logger.error(f"remove_namespace failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_namespace(request.subsystem_nqn, - str(request.nsid)) + ret = rpc_nvmf.nvmf_subsystem_remove_ns( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + nsid=request.nsid, + ) + self.logger.info(f"remove_namespace {request.nsid}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting remove_namespace {request.nsid}: {ex}") - raise + self.logger.error(f"remove_namespace failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_namespace(request.subsystem_nqn, + str(request.nsid)) + except Exception as ex: + self.logger.error( + f"Error persisting remove_namespace {request.nsid}: {ex}") + raise return pb2.req_status(status=ret) def remove_namespace(self, request, context=None): - with self.rpc_lock: - return self.remove_namespace_safe(request, context) + return self.execute_grpc_function(self.remove_namespace_safe, request, context) def matching_host_exists(self, context, subsys_nqn, host_nqn) -> bool: if not context: @@ -477,67 +485,67 @@ def add_host_safe(self, request, context=None): if self.is_discovery_nqn(request.host_nqn): raise Exception(f"Can't use a discovery NQN as host NQN") - try: - host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) - if host_already_exist: - if request.host_nqn == "*": - self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") - req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, - "method": "nvmf_subsystem_allow_any_host", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} - else: - self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") - req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, - "method": "nvmf_subsystem_add_host", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - if request.host_nqn == "*": # Allow any host access to subsystem - self.logger.info(f"Received request to allow any host to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=False, - ) - self.logger.info(f"add_host *: {ret}") - else: # Allow single host access to subsystem - self.logger.info( - f"Received request to add host {request.host_nqn} to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_add_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"add_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"add_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_host(request.subsystem_nqn, - request.host_nqn, json_req) + host_already_exist = self.matching_host_exists(context, request.subsystem_nqn, request.host_nqn) + if host_already_exist: + if request.host_nqn == "*": + self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_allow_any_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} + else: + self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_add_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + if request.host_nqn == "*": # Allow any host access to subsystem + self.logger.info(f"Received request to allow any host to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=False, + ) + self.logger.info(f"add_host *: {ret}") + else: # Allow single host access to subsystem + self.logger.info( + f"Received request to add host {request.host_nqn} to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_add_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"add_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error( - f"Error persisting add_host {request.host_nqn}: {ex}") - raise + self.logger.error(f"add_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_host(request.subsystem_nqn, + request.host_nqn, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_host {request.host_nqn}: {ex}") + raise return pb2.req_status(status=ret) def add_host(self, request, context=None): - with self.rpc_lock: - return self.add_host_safe(request, context) + return self.execute_grpc_function(self.add_host_safe, request, context) def remove_host_safe(self, request, context=None): """Removes a host from a subsystem.""" @@ -548,48 +556,48 @@ def remove_host_safe(self, request, context=None): if self.is_discovery_nqn(request.host_nqn): raise Exception(f"Can't use a discovery NQN as host NQN") - try: - if request.host_nqn == "*": # Disable allow any host access - self.logger.info( - f"Received request to disable any host access to" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_allow_any_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - disable=True, - ) - self.logger.info(f"remove_host *: {ret}") - else: # Remove single host access to subsystem - self.logger.info( - f"Received request to remove host_{request.host_nqn} from" - f" {request.subsystem_nqn}") - ret = rpc_nvmf.nvmf_subsystem_remove_host( - self.spdk_rpc_client, - nqn=request.subsystem_nqn, - host=request.host_nqn, - ) - self.logger.info(f"remove_host {request.host_nqn}: {ret}") - except Exception as ex: - self.logger.error(f"remove_host failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_host(request.subsystem_nqn, - request.host_nqn) + if request.host_nqn == "*": # Disable allow any host access + self.logger.info( + f"Received request to disable any host access to" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_allow_any_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + disable=True, + ) + self.logger.info(f"remove_host *: {ret}") + else: # Remove single host access to subsystem + self.logger.info( + f"Received request to remove host_{request.host_nqn} from" + f" {request.subsystem_nqn}, context: {context}") + ret = rpc_nvmf.nvmf_subsystem_remove_host( + self.spdk_rpc_client, + nqn=request.subsystem_nqn, + host=request.host_nqn, + ) + self.logger.info(f"remove_host {request.host_nqn}: {ret}") except Exception as ex: - self.logger.error(f"Error persisting remove_host: {ex}") - raise + self.logger.error(f"remove_host failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_host(request.subsystem_nqn, + request.host_nqn) + except Exception as ex: + self.logger.error(f"Error persisting remove_host: {ex}") + raise return pb2.req_status(status=ret) def remove_host(self, request, context=None): - with self.rpc_lock: - return self.remove_host_safe(request, context) + return self.execute_grpc_function(self.remove_host_safe, request, context) def matching_listener_exists(self, context, nqn, gw_name, trtype, traddr, trsvcid) -> bool: if not context: @@ -607,99 +615,99 @@ def create_listener_safe(self, request, context=None): traddr = GatewayConfig.escape_address_if_ipv6(request.traddr) self.logger.info(f"Received request to create {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" - f" {traddr}:{request.trsvcid}.") + f" {traddr}:{request.trsvcid}., context: {context}") if self.is_discovery_nqn(request.nqn): raise Exception(f"Can't create a listener for a discovery subsystem") - try: - if request.gateway_name == self.gateway_name: - listener_already_exist = self.matching_listener_exists( - context, request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) - if listener_already_exist: - self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") - req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, - "gateway_name": request.gateway_name, - "trsvcid": request.trsvcid, "adrfam": request.adrfam, - "method": "nvmf_subsystem_add_listener", "req_id": 0} - ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} - msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), - "Got JSON-RPC error response", - "response:", - json.dumps(ret, indent=2)]) - raise Exception(msg) - ret = rpc_nvmf.nvmf_subsystem_add_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"create_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"create_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - state = self.gateway_state.local.get_state() - enable_ha = False - subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn)) - if subsys_str: - self.logger.debug(f"value of sub-system: {subsys_str}") + with self.omap_lock(context=context): try: - subsys_dict = json.loads(subsys_str) - try: - enable_ha = subsys_dict["enable_ha"] - except KeyError: - enable_ha = False - self.logger.info(f"enable_ha: {enable_ha}") - except Exception as ex: - self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}") - pass - else: - self.logger.info(f"No subsystem for {request.nqn}") - - if enable_ha: - for x in range (MAX_ANA_GROUPS): - try: - ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( + if request.gateway_name == self.gateway_name: + listener_already_exist = self.matching_listener_exists( + context, request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) + if listener_already_exist: + self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") + req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, + "gateway_name": request.gateway_name, + "trsvcid": request.trsvcid, "adrfam": request.adrfam, + "method": "nvmf_subsystem_add_listener", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) + ret = rpc_nvmf.nvmf_subsystem_add_listener( self.spdk_rpc_client, nqn=request.nqn, - ana_state="inaccessible", trtype=request.trtype, traddr=request.traddr, trsvcid=request.trsvcid, adrfam=request.adrfam, - anagrpid=(x+1) ) - except Exception as ex: - self.logger.error(f" set_listener_ana_state failed with: \n {ex}") - raise - - if context: - # Update gateway state - try: - json_req = json_format.MessageToJson( - request, preserving_proto_field_name=True) - self.gateway_state.add_listener(request.nqn, - request.gateway_name, - request.trtype, request.traddr, - request.trsvcid, json_req) + ) + self.logger.info(f"create_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting add_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"create_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + state = self.gateway_state.local.get_state() + enable_ha = False + subsys_str = state.get(GatewayState.build_subsystem_key(request.nqn)) + if subsys_str: + self.logger.debug(f"value of sub-system: {subsys_str}") + try: + subsys_dict = json.loads(subsys_str) + try: + enable_ha = subsys_dict["enable_ha"] + except KeyError: + enable_ha = False + self.logger.info(f"enable_ha: {enable_ha}") + except Exception as ex: + self.logger.error(f"Got exception trying to parse subsystem {request.nqn}: {ex}") + pass + else: + self.logger.info(f"No subsystem for {request.nqn}") + + if enable_ha: + for x in range (MAX_ANA_GROUPS): + try: + ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( + self.spdk_rpc_client, + nqn=request.nqn, + ana_state="inaccessible", + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + anagrpid=(x+1) ) + except Exception as ex: + self.logger.error(f" set_listener_ana_state failed with: \n {ex}") + raise + + if context: + # Update gateway state + try: + json_req = json_format.MessageToJson( + request, preserving_proto_field_name=True) + self.gateway_state.add_listener(request.nqn, + request.gateway_name, + request.trtype, request.traddr, + request.trsvcid, json_req) + except Exception as ex: + self.logger.error( + f"Error persisting add_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) def create_listener(self, request, context=None): - with self.rpc_lock: - return self.create_listener_safe(request, context) + return self.execute_grpc_function(self.create_listener_safe, request, context) def delete_listener_safe(self, request, context=None): """Deletes a listener from a subsystem at a given IP/Port.""" @@ -708,55 +716,55 @@ def delete_listener_safe(self, request, context=None): traddr = GatewayConfig.escape_address_if_ipv6(request.traddr) self.logger.info(f"Received request to delete {request.gateway_name}" f" {request.trtype} listener for {request.nqn} at" - f" {traddr}:{request.trsvcid}.") + f" {traddr}:{request.trsvcid}., context: {context}") if self.is_discovery_nqn(request.nqn): raise Exception(f"Can't delete a listener from a discovery subsystem") - try: - if request.gateway_name == self.gateway_name: - ret = rpc_nvmf.nvmf_subsystem_remove_listener( - self.spdk_rpc_client, - nqn=request.nqn, - trtype=request.trtype, - traddr=request.traddr, - trsvcid=request.trsvcid, - adrfam=request.adrfam, - ) - self.logger.info(f"delete_listener: {ret}") - else: - raise Exception(f"Gateway name must match current gateway" - f" ({self.gateway_name})") - except Exception as ex: - self.logger.error(f"delete_listener failed with: \n {ex}") - if context: - context.set_code(grpc.StatusCode.INTERNAL) - context.set_details(f"{ex}") - return pb2.req_status() - - if context: - # Update gateway state + with self.omap_lock(context=context): try: - self.gateway_state.remove_listener(request.nqn, - request.gateway_name, - request.trtype, - request.traddr, - request.trsvcid) + if request.gateway_name == self.gateway_name: + ret = rpc_nvmf.nvmf_subsystem_remove_listener( + self.spdk_rpc_client, + nqn=request.nqn, + trtype=request.trtype, + traddr=request.traddr, + trsvcid=request.trsvcid, + adrfam=request.adrfam, + ) + self.logger.info(f"delete_listener: {ret}") + else: + raise Exception(f"Gateway name must match current gateway" + f" ({self.gateway_name})") except Exception as ex: - self.logger.error( - f"Error persisting delete_listener {request.trsvcid}: {ex}") - raise + self.logger.error(f"delete_listener failed with: \n {ex}") + if context: + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"{ex}") + return pb2.req_status() + + if context: + # Update gateway state + try: + self.gateway_state.remove_listener(request.nqn, + request.gateway_name, + request.trtype, + request.traddr, + request.trsvcid) + except Exception as ex: + self.logger.error( + f"Error persisting delete_listener {request.trsvcid}: {ex}") + raise return pb2.req_status(status=ret) def delete_listener(self, request, context=None): - with self.rpc_lock: - return self.delete_listener_safe(request, context) + return self.execute_grpc_function(self.delete_listener_safe, request, context) def get_subsystems_safe(self, request, context): """Gets subsystems.""" - self.logger.info(f"Received request to get subsystems") + self.logger.info(f"Received request to get subsystems, context: {context}") try: ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client) self.logger.info(f"get_subsystems: {ret}") diff --git a/control/server.py b/control/server.py index 3f891074..7675c2e8 100644 --- a/control/server.py +++ b/control/server.py @@ -25,7 +25,7 @@ from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc -from .state import GatewayState, LocalGatewayState, OmapGatewayState, GatewayStateHandler +from .state import GatewayState, LocalGatewayState, OmapLock, OmapGatewayState, GatewayStateHandler from .grpc import GatewayService from .discovery import DiscoveryService from .config import GatewayConfig @@ -113,10 +113,9 @@ def serve(self): self._start_discovery_service() # Register service implementation with server - gateway_state = GatewayStateHandler(self.config, local_state, - omap_state, self.gateway_rpc_caller) - self.gateway_rpc = GatewayService(self.config, gateway_state, - self.spdk_rpc_client) + gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) + omap_lock = OmapLock(omap_state, gateway_state) + self.gateway_rpc = GatewayService(self.config, gateway_state, omap_lock, self.spdk_rpc_client) self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server) diff --git a/control/state.py b/control/state.py index f4e575b4..36d8c0ce 100644 --- a/control/state.py +++ b/control/state.py @@ -11,6 +11,7 @@ import threading import rados import logging +import errno from typing import Dict from collections import defaultdict from abc import ABC, abstractmethod @@ -165,6 +166,124 @@ def reset(self, omap_state): self.state = omap_state +class OmapLock: + OMAP_FILE_LOCK_NAME = "omap_file_lock" + OMAP_FILE_LOCK_COOKIE = "omap_file_cookie" + + def __init__(self, omap_state, gateway_state) -> None: + self.logger = omap_state.logger + self.omap_state = omap_state + self.gateway_state = gateway_state + self.omap_file_lock_duration = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_duration", 60) + self.omap_file_update_reloads = self.omap_state.config.getint_with_default("gateway", "omap_file_update_reloads", 10) + self.omap_file_lock_retries = self.omap_state.config.getint_with_default("gateway", "omap_file_lock_retries", 15) + self.omap_file_lock_retry_sleep_interval = self.omap_state.config.getint_with_default("gateway", + "omap_file_lock_retry_sleep_interval", 5) + # This is used for testing purposes only. To allow us testing locking from two gateways at the same time + self.omap_file_disable_unlock = self.omap_state.config.getboolean_with_default("gateway", "omap_file_disable_unlock", False) + if self.omap_file_disable_unlock: + self.logger.warning(f"Will not unlock OMAP file for testing purposes") + self.enter_args = {} + + def __call__(self, **kwargs): + self.enter_args.clear() + self.enter_args.update(kwargs) + return self + + # + # We pass the context from the different functions here. It should point to a real object in case we come from a real + # resource changing function, resulting from a CLI command. It will be None in case we come from an automatic update + # which is done because the local state is out of date. In case context is None, that is we're in the middle of an update + # we should not try to lock the OMAP file as the code will not try to make changes there, only the local spdk calls + # are done in such a case. + # + def __enter__(self): + context = self.enter_args.get("context") + if context and self.omap_file_lock_duration > 0: + self.lock_omap() + return self + + def __exit__(self, typ, value, traceback): + context = self.enter_args.get("context") + self.enter_args.clear() + if context and self.omap_file_lock_duration > 0: + self.unlock_omap() + + # + # This function accepts a function in which there is Omap locking. It will execute this function + # and in case the Omap is not current, will reload it and try again + # + def execute_omap_locking_function(self, grpc_func, omap_locking_func, request, context): + for i in range(1, self.omap_file_update_reloads): + need_to_update = False + try: + return grpc_func(omap_locking_func, request, context) + except OSError as err: + if err.errno == errno.EAGAIN: + need_to_update = True + else: + raise + + assert need_to_update + for j in range(10): + if self.gateway_state.update(): + # update was succesful, we can stop trying + break + time.sleep(1) + + if need_to_update: + raise Exception(f"Unable to lock OMAP file after reloading {self.omap_file_update_reloads} times, exiting") + + def lock_omap(self): + got_lock = False + + for i in range(1, self.omap_file_lock_retries): + try: + self.omap_state.ioctx.lock_exclusive(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, + self.OMAP_FILE_LOCK_COOKIE, "OMAP file changes lock", self.omap_file_lock_duration, 0) + got_lock = True + if i > 1: + self.logger.info(f"Succeeded to lock OMAP file after {i} tries") + break + except rados.ObjectExists as ex: + self.logger.info(f"We already locked the OMAP file") + got_lock = True + break + except rados.ObjectBusy as ex: + self.logger.warning( + f"The OMAP file is locked, will try again in {self.omap_file_lock_retry_sleep_interval} seconds") + time.sleep(self.omap_file_lock_retry_sleep_interval) + except Exception as ex: + self.logger.error(f"Unable to lock OMAP file, exiting: {ex}") + raise + + if not got_lock: + self.logger.error(f"Unable to lock OMAP file after {self.omap_file_lock_retries} tries. Exiting!") + raise Exception("Unable to lock OMAP file") + + omap_version = self.omap_state.get_omap_version() + local_version = self.omap_state.get_local_version() + + if omap_version > local_version: + self.logger.warning( + f"Local version {local_version} differs from OMAP file version {omap_version}." + f" The file is not current, will reload it and try again") + self.unlock_omap() + raise OSError(errno.EAGAIN, "Unable to lock OMAP file, file not current", self.omap_state.omap_name) + + def unlock_omap(self): + if self.omap_file_disable_unlock: + self.logger.warning(f"OMAP file unlock was disabled, will not unlock file") + return + + try: + self.omap_state.ioctx.unlock(self.omap_state.omap_name, self.OMAP_FILE_LOCK_NAME, self.OMAP_FILE_LOCK_COOKIE) + except rados.ObjectNotFound as ex: + self.logger.warning(f"No such lock, the lock duration might have passed") + except Exception as ex: + self.logger.error(f"Unable to unlock OMAP file: {ex}") + pass + class OmapGatewayState(GatewayState): """Persists gateway NVMeoF target state to an OMAP object. @@ -390,6 +509,7 @@ def __init__(self, config, local, omap, gateway_rpc_caller): self.update_interval = 1 self.use_notify = self.config.getboolean("gateway", "state_update_notify") + self.update_is_active_lock = threading.Lock() def add_bdev(self, bdev_name: str, val: str): """Adds a bdev to the state data stores.""" @@ -482,53 +602,61 @@ def compare_state_values(self, val1, val2) -> bool: val2_str = val2.decode() if type(val2) == type(b'') else val2 return val1_str == val2_str - def update(self): + def update(self) -> bool: """Checks for updated omap state and initiates local update.""" - prefix_list = [ - GatewayState.BDEV_PREFIX, GatewayState.SUBSYSTEM_PREFIX, - GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX, - GatewayState.LISTENER_PREFIX - ] - - # Get version and state from OMAP - omap_state_dict = self.omap.get_state() - omap_version = int(omap_state_dict[self.omap.OMAP_VERSION_KEY]) - - if self.omap.get_local_version() < omap_version: - local_state_dict = self.local.get_state() - local_state_keys = local_state_dict.keys() - omap_state_keys = omap_state_dict.keys() - - # Find OMAP additions - added_keys = omap_state_keys - local_state_keys - added = {key: omap_state_dict[key] for key in added_keys} - grouped_added = self._group_by_prefix(added, prefix_list) - # Find OMAP changes - same_keys = omap_state_keys & local_state_keys - changed = { - key: omap_state_dict[key] - for key in same_keys - if not self.compare_state_values(local_state_dict[key], omap_state_dict[key]) - } - grouped_changed = self._group_by_prefix(changed, prefix_list) - # Find OMAP removals - removed_keys = local_state_keys - omap_state_keys - removed = {key: local_state_dict[key] for key in removed_keys} - grouped_removed = self._group_by_prefix(removed, prefix_list) - - # Handle OMAP removals and remove outdated changed components - grouped_removed.update(grouped_changed) - if grouped_removed: - self._update_call_rpc(grouped_removed, False, prefix_list) - # Handle OMAP additions and add updated changed components - grouped_added.update(grouped_changed) - if grouped_added: - self._update_call_rpc(grouped_added, True, prefix_list) - - # Update local state and version - self.local.reset(omap_state_dict) - self.omap.set_local_version(omap_version) - self.logger.debug("Update complete.") + + if self.update_is_active_lock.locked(): + self.logger.warning(f"An update is already running, ignore") + return False + + with self.update_is_active_lock: + prefix_list = [ + GatewayState.BDEV_PREFIX, GatewayState.SUBSYSTEM_PREFIX, + GatewayState.NAMESPACE_PREFIX, GatewayState.HOST_PREFIX, + GatewayState.LISTENER_PREFIX + ] + + # Get version and state from OMAP + omap_state_dict = self.omap.get_state() + omap_version = int(omap_state_dict[self.omap.OMAP_VERSION_KEY]) + + if self.omap.get_local_version() < omap_version: + local_state_dict = self.local.get_state() + local_state_keys = local_state_dict.keys() + omap_state_keys = omap_state_dict.keys() + + # Find OMAP additions + added_keys = omap_state_keys - local_state_keys + added = {key: omap_state_dict[key] for key in added_keys} + grouped_added = self._group_by_prefix(added, prefix_list) + # Find OMAP changes + same_keys = omap_state_keys & local_state_keys + changed = { + key: omap_state_dict[key] + for key in same_keys + if not self.compare_state_values(local_state_dict[key], omap_state_dict[key]) + } + grouped_changed = self._group_by_prefix(changed, prefix_list) + # Find OMAP removals + removed_keys = local_state_keys - omap_state_keys + removed = {key: local_state_dict[key] for key in removed_keys} + grouped_removed = self._group_by_prefix(removed, prefix_list) + + # Handle OMAP removals and remove outdated changed components + grouped_removed.update(grouped_changed) + if grouped_removed: + self._update_call_rpc(grouped_removed, False, prefix_list) + # Handle OMAP additions and add updated changed components + grouped_added.update(grouped_changed) + if grouped_added: + self._update_call_rpc(grouped_added, True, prefix_list) + + # Update local state and version + self.local.reset(omap_state_dict) + self.omap.set_local_version(omap_version) + self.logger.debug("Update complete.") + + return True def _group_by_prefix(self, state_update, prefix_list): """Groups state update by key prefixes.""" diff --git a/tests/test_omap_lock.py b/tests/test_omap_lock.py new file mode 100644 index 00000000..107afc3d --- /dev/null +++ b/tests/test_omap_lock.py @@ -0,0 +1,208 @@ +import pytest +import copy +import grpc +import json +import time +from control.server import GatewayServer +from control.proto import gateway_pb2 as pb2 +from control.proto import gateway_pb2_grpc as pb2_grpc +import spdk.rpc.bdev as rpc_bdev + +image = "mytestdevimage" +pool = "rbd" +bdev_prefix = "Ceph_" +subsystem_prefix = "nqn.2016-06.io.spdk:cnode" +created_resource_count = 500 + +@pytest.fixture(scope="function") +def conn(config, request): + """Sets up and tears down Gateways A and B.""" + update_notify = True + update_interval_sec = 5 + disable_unlock = False + lock_duration = 60 + if request.node.name == "test_multi_gateway_omap_reread": + update_notify = False + update_interval_sec = 300 + elif request.node.name == "test_trying_to_lock_twice": + disable_unlock = True + lock_duration = 100 # This should be bigger than lock retries * retry sleep interval + + # Setup GatewayA and GatewayB configs + configA = copy.deepcopy(config) + configA.config["gateway"]["name"] = "GatewayA" + configA.config["gateway"]["group"] = "Group1" + configA.config["gateway"]["state_update_notify"] = str(update_notify) + configA.config["gateway"]["state_update_interval_sec"] = str(update_interval_sec) + configA.config["gateway"]["omap_file_disable_unlock"] = str(disable_unlock) + configA.config["gateway"]["omap_file_lock_duration"] = str(lock_duration) + configA.config["gateway"]["min_controller_id"] = "1" + configA.config["gateway"]["max_controller_id"] = "20000" + configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" + configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" + configB = copy.deepcopy(configA) + addr = configA.get("gateway", "addr") + portA = configA.getint("gateway", "port") + portB = portA + 1 + configB.config["gateway"]["name"] = "GatewayB" + configB.config["gateway"]["port"] = str(portB) + configB.config["gateway"]["min_controller_id"] = "20001" + configB.config["gateway"]["max_controller_id"] = "40000" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" + configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" + + # Start servers + with ( + GatewayServer(configA) as gatewayA, + GatewayServer(configB) as gatewayB, + ): + gatewayA.serve() + # Delete existing OMAP state + gatewayA.gateway_rpc.gateway_state.delete_state() + # Create new + gatewayB.serve() + + # Bind the client and Gateways A & B + channelA = grpc.insecure_channel(f"{addr}:{portA}") + stubA = pb2_grpc.GatewayStub(channelA) + channelB = grpc.insecure_channel(f"{addr}:{portB}") + stubB = pb2_grpc.GatewayStub(channelB) + yield stubA, stubB, gatewayA.gateway_rpc, gatewayB.gateway_rpc + + # Stop gateways + gatewayA.server.stop(grace=1) + gatewayB.server.stop(grace=1) + gatewayB.gateway_rpc.gateway_state.delete_state() + +def test_multi_gateway_omap_reread(config, conn, caplog): + """Tests reading out of date OMAP file + """ + stubA, stubB, gatewayA, gatewayB = conn + bdev = bdev_prefix + "X0" + bdev2 = bdev_prefix + "X1" + bdev3 = bdev_prefix + "X2" + nqn = subsystem_prefix + "X1" + serial = "SPDK00000000000001" + nsid = 10 + num_subsystems = 2 + + # Send requests to create a subsystem with one namespace to GatewayA + bdev_req = pb2.create_bdev_req(bdev_name=bdev, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + subsystem_req = pb2.create_subsystem_req(subsystem_nqn=nqn, + serial_number=serial) + namespace_req = pb2.add_namespace_req(subsystem_nqn=nqn, + bdev_name=bdev, + nsid=nsid) + get_subsystems_req = pb2.get_subsystems_req() + ret_bdev = stubA.create_bdev(bdev_req) + ret_subsystem = stubA.create_subsystem(subsystem_req) + ret_namespace = stubA.add_namespace(namespace_req) + assert ret_bdev.status is True + assert ret_subsystem.status is True + assert ret_namespace.status is True + + # Until we create some resource on GW-B it shouldn't still have the resrouces created on GW-A, only the discovery subsystem + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == 1 + + watchA = stubA.get_subsystems(get_subsystems_req) + listA = json.loads(watchA.subsystems) + assert len(listA) == num_subsystems + + bdev2_req = pb2.create_bdev_req(bdev_name=bdev2, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev2 = stubB.create_bdev(bdev2_req) + assert ret_bdev2.status is True + assert "The file is not current, will reload it and try again" in caplog.text + + # Make sure that after reading the OMAP file GW-B has the subsystem and namespace created on GW-A + watchB = stubB.get_subsystems(get_subsystems_req) + listB = json.loads(watchB.subsystems) + assert len(listB) == num_subsystems + assert listB[num_subsystems-1]["nqn"] == nqn + assert listB[num_subsystems-1]["serial_number"] == serial + assert listB[num_subsystems-1]["namespaces"][0]["nsid"] == nsid + assert listB[num_subsystems-1]["namespaces"][0]["bdev_name"] == bdev + + caplog.clear() + bdev3_req = pb2.create_bdev_req(bdev_name=bdev3, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev3 = stubB.create_bdev(bdev3_req) + assert ret_bdev3.status is True + assert "The file is not current, will reload it and try again" not in caplog.text + + bdevsA = rpc_bdev.bdev_get_bdevs(gatewayA.spdk_rpc_client) + bdevsB = rpc_bdev.bdev_get_bdevs(gatewayB.spdk_rpc_client) + # GW-B should have the bdev created on GW-A after reading the OMAP file plus the two we created on it + # GW-A should only have the bdev created on it as we didn't update it after creating the bdev on GW-B + assert len(bdevsA) == 1 + assert len(bdevsB) == 3 + assert bdevsA[0]["name"] == bdev + assert bdevsB[0]["name"] == bdev + assert bdevsB[1]["name"] == bdev2 + assert bdevsB[2]["name"] == bdev3 + +def test_trying_to_lock_twice(config, image, conn, caplog): + """Tests an attempt to lock the OMAP file from two gateways at the same time + """ + caplog.clear() + stubA, stubB, gatewayA, gatewayB = conn + + with pytest.raises(Exception) as ex: + create_resource_by_index(stubA, 0) + create_resource_by_index(stubB, 1) + assert "OMAP file unlock was disabled, will not unlock file" in caplog.text + assert "The OMAP file is locked, will try again in" in caplog.text + assert "Unable to lock OMAP file" in caplog.text + time.sleep(120) # Wait enough time for OMAP lock to be released + +def create_resource_by_index(stub, i): + bdev = f"{bdev_prefix}{i}" + bdev_req = pb2.create_bdev_req(bdev_name=bdev, + rbd_pool_name=pool, + rbd_image_name=image, + block_size=4096) + ret_bdev = stub.create_bdev(bdev_req) + assert ret_bdev + subsystem = f"{subsystem_prefix}{i}" + subsystem_req = pb2.create_subsystem_req(subsystem_nqn=subsystem) + ret_subsystem = stub.create_subsystem(subsystem_req) + assert ret_subsystem + namespace_req = pb2.add_namespace_req(subsystem_nqn=subsystem, + bdev_name=bdev) + ret_namespace = stub.add_namespace(namespace_req) + assert ret_namespace + +def check_resource_by_index(i, caplog): + bdev = f"{bdev_prefix}{i}" + # notice that this also verifies the namespace as the bdev name is in the namespaces section + assert f"{bdev}" in caplog.text + subsystem = f"{subsystem_prefix}{i}" + assert f"{subsystem}" in caplog.text + +def test_multi_gateway_concurrent_changes(config, image, conn, caplog): + """Tests concurrent changes to the OMAP from two gateways + """ + caplog.clear() + stubA, stubB, gatewayA, gatewayB = conn + + for i in range(created_resource_count): + if i % 2: + stub = stubA + else: + stub = stubB + create_resource_by_index(stub, i) + assert "Failed" not in caplog.text + + # Let the update some time to bring both gateways to the same page + time.sleep(15) + for i in range(created_resource_count): + check_resource_by_index(i, caplog)