Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create the SPDK socket under /var/run/ceph/<fsid> and not /var/tmp #296

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ client_cert = ./client.crt

[spdk]
tgt_path = /usr/local/bin/nvmf_tgt
rpc_socket = /var/tmp/spdk.sock
#rpc_socket_dir = /var/tmp/
gbregman marked this conversation as resolved.
Show resolved Hide resolved
#rpc_socket_name = spdk.sock
#tgt_cmd_extra_args = --env-context="--no-huge -m1024" --iova-mode=va
timeout = 60.0
log_level = WARN
Expand Down
49 changes: 35 additions & 14 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, config):
self.gateway_rpc = None
self.server = None
self.discovery_pid = None
self.spdk_rpc_socket_path = None

self.name = self.config.get("gateway", "name")
if not self.name:
Expand Down Expand Up @@ -98,18 +99,19 @@ def serve(self):
"""Starts gateway server."""
self.logger.debug("Starting serve")

omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()

# install SIGCHLD handler
signal.signal(signal.SIGCHLD, sigchld_handler)

# Start SPDK
self._start_spdk()
self._start_spdk(omap_state)

# Start discovery service
self._start_discovery_service()

# Register service implementation with server
omap_state = OmapGatewayState(self.config)
local_state = LocalGatewayState()
gateway_state = GatewayStateHandler(self.config, local_state,
omap_state, self.gateway_rpc_caller)
self.gateway_rpc = GatewayService(self.config, gateway_state,
Expand Down Expand Up @@ -183,18 +185,38 @@ def _add_server_listener(self):
self.server.add_insecure_port("{}:{}".format(
gateway_addr, gateway_port))

def _start_spdk(self):
def _get_spdk_rpc_socket_path(self, omap_state) -> str:
# For backward compatibility, try first to get the old attribute
spdk_rpc_socket = self.config.get_with_default("spdk", "rpc_socket", "")
if spdk_rpc_socket:
return spdk_rpc_socket

spdk_rpc_socket_dir = self.config.get_with_default("spdk", "rpc_socket_dir", "")
if not spdk_rpc_socket_dir:
spdk_rpc_socket_dir = "/var/run/ceph/"
if omap_state.ceph_fsid:
spdk_rpc_socket_dir += omap_state.ceph_fsid + "/"
if not spdk_rpc_socket_dir.endswith("/"):
spdk_rpc_socket_dir += "/"
try:
os.makedirs(spdk_rpc_socket_dir, 0o777, True)
except Exception:
pass
spdk_rpc_socket = spdk_rpc_socket_dir + self.config.get_with_default("spdk", "rpc_socket_name", "spdk.sock")
return spdk_rpc_socket

def _start_spdk(self, omap_state):
"""Starts SPDK process."""

# Start target
self.logger.debug(f"Configuring server {self.name}")
spdk_tgt_path = self.config.get("spdk", "tgt_path")
self.logger.info(f"SPDK Target Path: {spdk_tgt_path}")
spdk_rpc_socket = self.config.get("spdk", "rpc_socket")
self.logger.info(f"SPDK Socket: {spdk_rpc_socket}")
self.spdk_rpc_socket_path = self._get_spdk_rpc_socket_path(omap_state)
self.logger.info(f"SPDK Socket: {self.spdk_rpc_socket_path}")
spdk_tgt_cmd_extra_args = self.config.get_with_default(
"spdk", "tgt_cmd_extra_args", "")
cmd = [spdk_tgt_path, "-u", "-r", spdk_rpc_socket]
cmd = [spdk_tgt_path, "-u", "-r", self.spdk_rpc_socket_path]
if spdk_tgt_cmd_extra_args:
cmd += shlex.split(spdk_tgt_cmd_extra_args)
self.logger.info(f"Starting {' '.join(cmd)}")
Expand All @@ -212,19 +234,19 @@ def _start_spdk(self):
conn_retries = int(timeout * 5)
self.logger.info(f"SPDK process id: {self.spdk_process.pid}")
self.logger.info(
f"Attempting to initialize SPDK: rpc_socket: {spdk_rpc_socket},"
f"Attempting to initialize SPDK: rpc_socket: {self.spdk_rpc_socket_path},"
f" conn_retries: {conn_retries}, timeout: {timeout}"
)
try:
self.spdk_rpc_client = rpc_client.JSONRPCClient(
spdk_rpc_socket,
self.spdk_rpc_socket_path,
None,
timeout,
log_level=log_level,
conn_retries=conn_retries,
)
self.spdk_rpc_ping_client = rpc_client.JSONRPCClient(
spdk_rpc_socket,
self.spdk_rpc_socket_path,
None,
timeout,
log_level=log_level,
Expand All @@ -245,7 +267,6 @@ def _stop_spdk(self):
assert self.spdk_process is not None # should be verified by the caller

return_code = self.spdk_process.returncode
rpc_socket = self.config.get("spdk", "rpc_socket")

# Terminate spdk process
if return_code is not None:
Expand All @@ -264,12 +285,12 @@ def _stop_spdk(self):
self.spdk_process.kill() # kill -9, send KILL signal

# Clean spdk rpc socket
if os.path.exists(rpc_socket):
if self.spdk_rpc_socket_path and os.path.exists(self.spdk_rpc_socket_path):
try:
os.remove(rpc_socket)
os.remove(self.spdk_rpc_socket_path)
except Exception:
self.logger.exception(f"An error occurred while removing "
f"rpc socket {rpc_socket}:")
f"rpc socket {self.spdk_rpc_socket_path}:")

def _stop_discovery(self):
"""Stops Discovery service process."""
Expand Down
12 changes: 12 additions & 0 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def __init__(self, config):
self.watch = None
gateway_group = self.config.get("gateway", "group")
self.omap_name = f"nvmeof.{gateway_group}.state" if gateway_group else "nvmeof.state"
self.ceph_fsid = None

try:
self.ioctx = self.open_rados_connection(self.config)
Expand Down Expand Up @@ -203,13 +204,24 @@ def fetch_and_display_ceph_version(self, conn):
self.logger.debug(f"Got exception trying to fetch Ceph version: {ex}")
pass

def fetch_ceph_fsid(self, conn) -> str:
fsid = None
try:
fsid = conn.get_fsid()
except Exception as ex:
self.logger.debug(f"Got exception trying to fetch Ceph fsid: {ex}")
pass

return fsid

def open_rados_connection(self, config):
ceph_pool = config.get("ceph", "pool")
ceph_conf = config.get("ceph", "config_file")
rados_id = config.get_with_default("ceph", "id", "")
conn = rados.Rados(conffile=ceph_conf, rados_id=rados_id)
conn.connect()
self.fetch_and_display_ceph_version(conn)
self.ceph_fsid = self.fetch_ceph_fsid(conn)
ioctx = conn.open_ioctx(ceph_pool)
return ioctx

Expand Down
3 changes: 2 additions & 1 deletion tests/test_multi_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def conn(config):
configA.config["gateway"]["min_controller_id"] = "1"
configA.config["gateway"]["max_controller_id"] = "20000"
configA.config["gateway"]["enable_spdk_discovery_controller"] = "True"
configA.config["spdk"]["rpc_socket_name"] = "spdk_GatewayA.sock"
configB = copy.deepcopy(configA)
addr = configA.get("gateway", "addr")
portA = configA.getint("gateway", "port")
Expand All @@ -31,7 +32,7 @@ def conn(config):
configA.config["gateway"]["max_controller_id"] = "40000"
configB.config["gateway"]["state_update_interval_sec"] = str(
update_interval_sec)
configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x02"

# Start servers
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_spdk_multi_gateway_exception(self):
configB = copy.deepcopy(configA)
configB.config["gateway"]["name"] = "GatewayB"
configB.config["gateway"]["port"] = str(configA.getint("gateway", "port") + 1)
configB.config["spdk"]["rpc_socket"] = "/var/tmp/spdk_GatewayB.sock"
configB.config["spdk"]["rpc_socket_name"] = "spdk_GatewayB.sock"
# invalid arg, spdk would exit with code 1 at start up
configB.config["spdk"]["tgt_cmd_extra_args"] = "-m 0x343435545"

Expand Down