Skip to content

Commit

Permalink
Create the SPDK socket under /var/run/ceph/<fsid> and not /var/tmp
Browse files Browse the repository at this point in the history
Fixes ceph#216

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Oct 29, 2023
1 parent a673152 commit f814cf4
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 17 deletions.
3 changes: 2 additions & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ client_cert = ./client.crt

[spdk]
tgt_path = /usr/local/bin/nvmf_tgt
rpc_socket = /var/tmp/spdk.sock
#rpc_socket_dir = /var/tmp/
#rpc_socket_name = spdk.sock
#tgt_cmd_extra_args = --env-context="--no-huge -m1024" --iova-mode=va
timeout = 60.0
log_level = WARN
Expand Down
49 changes: 35 additions & 14 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, config):
self.gateway_rpc = None
self.server = None
self.discovery_pid = None
self.spdk_rpc_socket_path = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand Down Expand Up @@ -98,18 +99,19 @@ def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")

omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()

# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# Start SPDK
self._start_spdk()
self._start_spdk(omap_state)

# Start discovery service
self._start_discovery_service()

# Register service implementation with server
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
gateway_state = GatewayStateHandler(self.config, local_state,
omap_state, self.gateway_rpc_caller)
self.gateway_rpc = GatewayService(self.config, gateway_state,
Expand Down Expand Up @@ -183,18 +185,38 @@ def _add_server_listener(self):
self.server.add_insecure_port("{}:{}".format(
gateway_addr, gateway_port))

def _start_spdk(self):
def _get_spdk_rpc_socket_path(self, omap_state) -> str:
# For backward compatibility, try first to get the old attribute
spdk_rpc_socket = self.config.get_with_default("spdk", "rpc_socket", "")
if spdk_rpc_socket:
return spdk_rpc_socket

spdk_rpc_socket_dir = self.config.get_with_default("spdk", "rpc_socket_dir", "")
if not spdk_rpc_socket_dir:
spdk_rpc_socket_dir = "/var/run/ceph/"
if omap_state.ceph_fsid:
spdk_rpc_socket_dir += omap_state.ceph_fsid + "/"
if not spdk_rpc_socket_dir.endswith("/"):
spdk_rpc_socket_dir += "/"
try:
os.makedirs(spdk_rpc_socket_dir, 0o777, True)
except Exception:
pass
spdk_rpc_socket = spdk_rpc_socket_dir + self.config.get_with_default("spdk", "rpc_socket_name", "spdk.sock")
return spdk_rpc_socket

def _start_spdk(self, omap_state):
"""Starts SPDK process."""

# Start target
self.logger.debug(f"Configuring server {self.name}")
spdk_tgt_path = self.config.get("spdk", "tgt_path")
self.logger.info(f"SPDK Target Path: {spdk_tgt_path}")
spdk_rpc_socket = self.config.get("spdk", "rpc_socket")
self.logger.info(f"SPDK Socket: {spdk_rpc_socket}")
self.spdk_rpc_socket_path = self._get_spdk_rpc_socket_path(omap_state)
self.logger.info(f"SPDK Socket: {self.spdk_rpc_socket_path}")
spdk_tgt_cmd_extra_args = self.config.get_with_default(
"spdk", "tgt_cmd_extra_args", "")
cmd = [spdk_tgt_path, "-u", "-r", spdk_rpc_socket]
cmd = [spdk_tgt_path, "-u", "-r", self.spdk_rpc_socket_path]
if spdk_tgt_cmd_extra_args:
cmd += shlex.split(spdk_tgt_cmd_extra_args)
self.logger.info(f"Starting {' '.join(cmd)}")
Expand All @@ -212,19 +234,19 @@ def _start_spdk(self):
conn_retries = int(timeout * 5)
self.logger.info(f"SPDK process id: {self.spdk_process.pid}")
self.logger.info(
f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket},"
f"Attempting to initialize SPDK: rpc_socket: {self.spdk_rpc_socket_path},"
f" conn_retries: {conn_retries}, timeout: {timeout}"
)
try:
self.spdk_rpc_client = rpc_client.JSONRPCClient(
spdk_rpc_socket,
self.spdk_rpc_socket_path,
None,
timeout,
log_level=log_level,
conn_retries=conn_retries,
)
self.spdk_rpc_ping_client = rpc_client.JSONRPCClient(
spdk_rpc_socket,
self.spdk_rpc_socket_path,
None,
timeout,
log_level=log_level,
Expand All @@ -245,7 +267,6 @@ def _stop_spdk(self):
assert self.spdk_process is not None # should be verified by the caller

return_code = self.spdk_process.returncode
rpc_socket = self.config.get("spdk", "rpc_socket")

# Terminate spdk process
if return_code is not None:
Expand All @@ -264,12 +285,12 @@ def _stop_spdk(self):
self.spdk_process.kill() # kill -9, send KILL signal

# Clean spdk rpc socket
if os.path.exists(rpc_socket):
if self.spdk_rpc_socket_path and os.path.exists(self.spdk_rpc_socket_path):
try:
os.remove(rpc_socket)
os.remove(self.spdk_rpc_socket_path)
except Exception:
self.logger.exception(f"An error occurred while removing "
f"rpc socket {rpc_socket}:")
f"rpc socket {self.spdk_rpc_socket_path}:")

def _stop_discovery(self):
"""Stops Discovery service process."""
Expand Down
12 changes: 12 additions & 0 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def __init__(self, config):
self.watch = None
gateway_group = self.config.get("gateway", "group")
self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state"
self.ceph_fsid = None

try:
self.ioctx = self.open_rados_connection(self.config)
Expand Down Expand Up @@ -203,13 +204,24 @@ def fetch_and_display_ceph_version(self, conn):
self.logger.debug(f"Got exception trying to fetch Ceph version: {ex}")
pass

def fetch_ceph_fsid(self, conn) -> str:
fsid = None
try:
fsid = conn.get_fsid()
except Exception as ex:
self.logger.debug(f"Got exception trying to fetch Ceph fsid: {ex}")
pass

return fsid

def open_rados_connection(self, config):
ceph_pool = config.get("ceph", "pool")
ceph_conf = config.get("ceph", "config_file")
rados_id = config.get_with_default("ceph", "id", "")
conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id)
conn.connect()
self.fetch_and_display_ceph_version(conn)
self.ceph_fsid = self.fetch_ceph_fsid(conn)
ioctx = conn.open_ioctx(ceph_pool)
return ioctx

Expand Down
3 changes: 2 additions & 1 deletion tests/test_multi_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def conn(config):
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock"
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portA = configA.getint("gateway", "port")
Expand All @@ -31,7 +32,7 @@ def conn(config):
configA.config["gateway"]["max_controller_id"] = "40000"
configB.config["gateway"]["state_update_interval_sec"] = str(
update_interval_sec)
configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

# Start servers
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_spdk_multi_gateway_exception(self):
configB = copy.deepcopy(configA)
configB.config["gateway"]["name"] = "GatewayB"
configB.config["gateway"]["port"] = str(configA.getint("gateway", "port") + 1)
configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
# invalid arg, spdk would exit with code 1 at start up
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x343435545"

Expand Down

0 comments on commit f814cf4

Please sign in to comment.