diff --git a/ceph-nvmeof.conf b/ceph-nvmeof.conf index fd566468..0c54d561 100644 --- a/ceph-nvmeof.conf +++ b/ceph-nvmeof.conf @@ -37,7 +37,8 @@ client_cert = ./client.crt [spdk] tgt_path = /usr/local/bin/nvmf_tgt -rpc_socket = /var/tmp/spdk.sock +#rpc_socket_dir = /var/tmp/ +#rpc_socket_name = spdk.sock #tgt_cmd_extra_args = --env-context="--no-huge -m1024" --iova-mode=va timeout = 60.0 log_level = WARN diff --git a/control/server.py b/control/server.py index 748f26d3..2c3f8277 100644 --- a/control/server.py +++ b/control/server.py @@ -66,6 +66,7 @@ def __init__(self, config): self.gateway_rpc = None self.server = None self.discovery_pid = None + self.spdk_rpc_socket_path = None self.name = self.config.get("gateway", "name") if not self.name: @@ -98,18 +99,19 @@ def serve(self): """Starts gateway server.""" self.logger.debug("Starting serve") + omap_state = OmapGatewayState(self.config) + local_state = LocalGatewayState() + # install SIGCHLD handler signal.signal(signal.SIGCHLD, sigchld_handler) # Start SPDK - self._start_spdk() + self._start_spdk(omap_state) # Start discovery service self._start_discovery_service() # Register service implementation with server - omap_state = OmapGatewayState(self.config) - local_state = LocalGatewayState() gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller) self.gateway_rpc = GatewayService(self.config, gateway_state, @@ -183,18 +185,38 @@ def _add_server_listener(self): self.server.add_insecure_port("{}:{}".format( gateway_addr, gateway_port)) - def _start_spdk(self): + def _get_spdk_rpc_socket_path(self, omap_state) -> str: + # For backward compatibility, try first to get the old attribute + spdk_rpc_socket = self.config.get_with_default("spdk", "rpc_socket", "") + if spdk_rpc_socket: + return spdk_rpc_socket + + spdk_rpc_socket_dir = self.config.get_with_default("spdk", "rpc_socket_dir", "") + if not spdk_rpc_socket_dir: + spdk_rpc_socket_dir = "/var/run/ceph/" + if omap_state.ceph_fsid: + spdk_rpc_socket_dir += omap_state.ceph_fsid + "/" + if not spdk_rpc_socket_dir.endswith("/"): + spdk_rpc_socket_dir += "/" + try: + os.makedirs(spdk_rpc_socket_dir, 0o777, True) + except Exception: + pass + spdk_rpc_socket = spdk_rpc_socket_dir + self.config.get_with_default("spdk", "rpc_socket_name", "spdk.sock") + return spdk_rpc_socket + + def _start_spdk(self, omap_state): """Starts SPDK process.""" # Start target self.logger.debug(f"Configuring server {self.name}") spdk_tgt_path = self.config.get("spdk", "tgt_path") self.logger.info(f"SPDK Target Path: {spdk_tgt_path}") - spdk_rpc_socket = self.config.get("spdk", "rpc_socket") - self.logger.info(f"SPDK Socket: {spdk_rpc_socket}") + self.spdk_rpc_socket_path = self._get_spdk_rpc_socket_path(omap_state) + self.logger.info(f"SPDK Socket: {self.spdk_rpc_socket_path}") spdk_tgt_cmd_extra_args = self.config.get_with_default( "spdk", "tgt_cmd_extra_args", "") - cmd = [spdk_tgt_path, "-u", "-r", spdk_rpc_socket] + cmd = [spdk_tgt_path, "-u", "-r", self.spdk_rpc_socket_path] if spdk_tgt_cmd_extra_args: cmd += shlex.split(spdk_tgt_cmd_extra_args) self.logger.info(f"Starting {' '.join(cmd)}") @@ -212,19 +234,19 @@ def _start_spdk(self): conn_retries = int(timeout * 5) self.logger.info(f"SPDK process id: {self.spdk_process.pid}") self.logger.info( - f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket}," + f"Attempting to initialize SPDK: rpc_socket: {self.spdk_rpc_socket_path}," f" conn_retries: {conn_retries}, timeout: {timeout}" ) try: self.spdk_rpc_client = rpc_client.JSONRPCClient( - spdk_rpc_socket, + self.spdk_rpc_socket_path, None, timeout, log_level=log_level, conn_retries=conn_retries, ) self.spdk_rpc_ping_client = rpc_client.JSONRPCClient( - spdk_rpc_socket, + self.spdk_rpc_socket_path, None, timeout, log_level=log_level, @@ -245,7 +267,6 @@ def _stop_spdk(self): assert self.spdk_process is not None # should be verified by the caller return_code = self.spdk_process.returncode - rpc_socket = self.config.get("spdk", "rpc_socket") # Terminate spdk process if return_code is not None: @@ -264,12 +285,12 @@ def _stop_spdk(self): self.spdk_process.kill() # kill -9, send KILL signal # Clean spdk rpc socket - if os.path.exists(rpc_socket): + if self.spdk_rpc_socket_path and os.path.exists(self.spdk_rpc_socket_path): try: - os.remove(rpc_socket) + os.remove(self.spdk_rpc_socket_path) except Exception: self.logger.exception(f"An error occurred while removing " - f"rpc socket {rpc_socket}:") + f"rpc socket {self.spdk_rpc_socket_path}:") def _stop_discovery(self): """Stops Discovery service process.""" diff --git a/control/state.py b/control/state.py index 98c338ae..3f5acc6f 100644 --- a/control/state.py +++ b/control/state.py @@ -170,6 +170,7 @@ def __init__(self, config): self.watch = None gateway_group = self.config.get("gateway", "group") self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state" + self.ceph_fsid = None try: self.ioctx = self.open_rados_connection(self.config) @@ -203,6 +204,16 @@ def fetch_and_display_ceph_version(self, conn): self.logger.debug(f"Got exception trying to fetch Ceph version: {ex}") pass + def fetch_ceph_fsid(self, conn) -> str: + fsid = None + try: + fsid = conn.get_fsid() + except Exception as ex: + self.logger.debug(f"Got exception trying to fetch Ceph fsid: {ex}") + pass + + return fsid + def open_rados_connection(self, config): ceph_pool = config.get("ceph", "pool") ceph_conf = config.get("ceph", "config_file") @@ -210,6 +221,7 @@ def open_rados_connection(self, config): conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id) conn.connect() self.fetch_and_display_ceph_version(conn) + self.ceph_fsid = self.fetch_ceph_fsid(conn) ioctx = conn.open_ioctx(ceph_pool) return ioctx diff --git a/tests/test_multi_gateway.py b/tests/test_multi_gateway.py index 8e0b3da0..a3cf217b 100644 --- a/tests/test_multi_gateway.py +++ b/tests/test_multi_gateway.py @@ -21,6 +21,7 @@ def conn(config): configA.config["gateway"]["min_controller_id"] = "1" configA.config["gateway"]["max_controller_id"] = "20000" configA.config["gateway"]["enable_spdk_discovery_controller"] = "True" + configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock" configB = copy.deepcopy(configA) addr = configA.get("gateway", "addr") portA = configA.getint("gateway", "port") @@ -31,7 +32,7 @@ def conn(config): configA.config["gateway"]["max_controller_id"] = "40000" configB.config["gateway"]["state_update_interval_sec"] = str( update_interval_sec) - configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02" # Start servers diff --git a/tests/test_server.py b/tests/test_server.py index 1bddc93a..cd199749 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -55,7 +55,7 @@ def test_spdk_multi_gateway_exception(self): configB = copy.deepcopy(configA) configB.config["gateway"]["name"] = "GatewayB" configB.config["gateway"]["port"] = str(configA.getint("gateway", "port") + 1) - configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock" + configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock" # invalid arg, spdk would exit with code 1 at start up configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x343435545"