Skip to content

Commit

Permalink
Fail any attempt to create a duplicate listener or host.
Browse files Browse the repository at this point in the history
Also, minor changes to delete_bdev and ANA handling in create_listener.
Fixes ceph#144

Signed-off-by: Gil Bregman <[email protected]>
  • Loading branch information
gbregman committed Oct 22, 2023
1 parent 4d74dcf commit c04cf91
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 20 deletions.
4 changes: 2 additions & 2 deletions control/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ def delete_bdev(self, args):
argument("-n", "--subnqn", help="Subsystem NQN", required=True),
argument("-s", "--serial", help="Serial number", required=False),
argument("-m", "--max-namespaces", help="Maximum number of namespaces", type=int, default=0, required=False),
argument("-a", "--ana-reporting", help="Enable ANA reporting", type=bool, default=False, required=False),
argument("-t", "--enable-ha", help="Enable automatic HA" , type=bool, default=False, required=False),
argument("-a", "--ana-reporting", help="Enable ANA reporting", action='store_true', required=False),
argument("-t", "--enable-ha", help="Enable automatic HA", action='store_true', required=False),
])
def create_subsystem(self, args):
Expand Down
89 changes: 72 additions & 17 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.protobuf import json_format
from .proto import gateway_pb2 as pb2
from .proto import gateway_pb2_grpc as pb2_grpc
from typing import Dict

MAX_ANA_GROUPS = 4

Expand Down Expand Up @@ -126,6 +127,10 @@ def _alloc_cluster(self) -> str:
)
return name

def get_state(self) -> Dict[str, str]:
local_state_dict = self.gateway_state.local.get_state()
return local_state_dict

def create_bdev_safe(self, request, context=None):
"""Creates a bdev from an RBD image."""

Expand Down Expand Up @@ -171,10 +176,9 @@ def create_bdev(self, request, context=None):
with self.rpc_lock:
return self.create_bdev_safe(request, context)

def find_bdev_namespaces(self, bdev_name):
def get_bdev_namespaces(self, bdev_name) -> list:
ns_list = []
local_state_dict = self.gateway_state.local.get_state()
local_state_keys = local_state_dict.keys()
local_state_dict = self.get_state()
for key, val in local_state_dict.items():
if key.startswith(self.gateway_state.local.NAMESPACE_PREFIX):
try:
Expand All @@ -201,9 +205,11 @@ def delete_bdev_safe(self, request, context=None):
"""Deletes a bdev."""

self.logger.info(f"Received request to delete bdev {request.bdev_name}")
ns_list = self.find_bdev_namespaces(request.bdev_name)
ns_list = []
if context:
ns_list = self.get_bdev_namespaces(request.bdev_name)
for namespace in ns_list:
# We found a namespace still using this bdev. If --force was used we will try to remove this namespace.
# We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP.
# Otherwise fail with EBUSY
try:
ns_nsid = namespace["nsid"]
Expand All @@ -215,10 +221,8 @@ def delete_bdev_safe(self, request, context=None):
if request.force:
self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}")
try:
req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=ns_nqn, nsid=ns_nsid)
# We already hold the lock, so call the safe version, do not try to lock again
ret = self.remove_namespace_safe(req_rm_ns, context)
self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}: {ret.status}")
self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid))
self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}")
except Exception as ex:
self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}")
pass
Expand Down Expand Up @@ -412,10 +416,34 @@ def remove_namespace(self, request, context=None):
with self.rpc_lock:
return self.remove_namespace_safe(request, context)

def matching_host_exist(self, subsys_nqn, host_nqn) -> bool:
host_key = "_".join([self.gateway_state.local.HOST_PREFIX + subsys_nqn, host_nqn])
if self.get_state().get(host_key):
return True
else:
return False

def add_host_safe(self, request, context=None):
"""Adds a host to a subsystem."""

try:
host_already_exist = self.matching_host_exist(request.subsystem_nqn, request.host_nqn)
if host_already_exist:
if request.host_nqn == "*":
self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}")
req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn,
"method": "nvmf_subsystem_allow_any_host", "req_id": 0}
ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"}
else:
self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}")
req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn,
"method": "nvmf_subsystem_add_host", "req_id": 0}
ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"}
msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2),
"Got JSON-RPC error response",
"response:",
json.dumps(ret, indent=2)])
raise Exception(msg)
if request.host_nqn == "*": # Allow any host access to subsystem
self.logger.info(f"Received request to allow any host to"
f" {request.subsystem_nqn}")
Expand Down Expand Up @@ -506,6 +534,13 @@ def remove_host(self, request, context=None):
with self.rpc_lock:
return self.remove_host_safe(request, context)

def matching_listener_exist(self, nqn, gw_name, trtype, traddr, trsvcid) -> bool:
listener_key = "_".join([self.gateway_state.local.LISTENER_PREFIX + nqn, gw_name, trtype, traddr, trsvcid])
if self.get_state().get(listener_key):
return True
else:
return False

def create_listener_safe(self, request, context=None):
"""Creates a listener for a subsystem at a given IP/Port."""
ret = True
Expand All @@ -514,6 +549,20 @@ def create_listener_safe(self, request, context=None):
f" {request.traddr}:{request.trsvcid}.")
try:
if request.gateway_name == self.gateway_name:
listener_already_exist = self.matching_listener_exist(
request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid)
if listener_already_exist:
self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}")
req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr,
"gateway_name": request.gateway_name,
"trsvcid": request.trsvcid, "adrfam": request.adrfam,
"method": "nvmf_subsystem_add_listener", "req_id": 0}
ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"}
msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2),
"Got JSON-RPC error response",
"response:",
json.dumps(ret, indent=2)])
raise Exception(msg)
ret = rpc_nvmf.nvmf_subsystem_add_listener(
self.spdk_rpc_client,
nqn=request.nqn,
Expand All @@ -533,15 +582,21 @@ def create_listener_safe(self, request, context=None):
context.set_details(f"{ex}")
return pb2.req_status()

state = self.gateway_state.omap.get_state()
for key,val in state.items():
if (key.startswith(self.gateway_state.omap.SUBSYSTEM_PREFIX + request.nqn)):
self.logger.debug(f"values of key: {key} val: {val} \n")
req = json_format.Parse(val, pb2.create_subsystem_req())
self.logger.info(f" enable_ha :{req.enable_ha} \n")
break
state = self.get_state()
req = None
subsys = state.get(self.gateway_state.local.SUBSYSTEM_PREFIX + request.nqn)
if subsys:
self.logger.debug(f"value of sub-system: {subsys}")
try:
req = json_format.Parse(subsys, pb2.create_subsystem_req())
self.logger.info(f"enable_ha: {req.enable_ha}")
except Exception:
self.logger.error(f"Got exception trying to parse subsystem: {ex}")
pass
else:
self.logger.info(f"No sub-system for {request.nqn}")

if req.enable_ha:
if req and req.enable_ha:
for x in range (MAX_ANA_GROUPS):
try:
ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test_create_bdev_ana(self, caplog, gateway):


def test_create_subsystem_ana(self, caplog, gateway):
cli(["create_subsystem", "-n", subsystem, "-a", "true", "-t", "true"])
cli(["create_subsystem", "-n", subsystem, "-a", "-t"])
assert "Failed to create" not in caplog.text
cli(["get_subsystems"])
assert serial not in caplog.text
Expand Down

0 comments on commit c04cf91

Please sign in to comment.